]> git.proxmox.com Git - proxmox-backup.git/commitdiff
move src/tools/daemon.rs to proxmox-rest-server workspace
authorDietmar Maurer <dietmar@proxmox.com>
Tue, 21 Sep 2021 05:58:41 +0000 (07:58 +0200)
committerThomas Lamprecht <t.lamprecht@proxmox.com>
Tue, 21 Sep 2021 06:46:41 +0000 (08:46 +0200)
Signed-off-by: Thomas Lamprecht <t.lamprecht@proxmox.com>
proxmox-rest-server/Cargo.toml
proxmox-rest-server/src/daemon.rs [new file with mode: 0644]
proxmox-rest-server/src/lib.rs
src/api2/node/mod.rs
src/bin/proxmox-backup-api.rs
src/bin/proxmox-backup-proxy.rs
src/tools/daemon.rs [deleted file]
src/tools/mod.rs

index 33ed6f39b3b3d20cc2facadc3937f6e471f23f23..4d1f1459d7fcfa13ef383d59cd985f6fe3dd9f48 100644 (file)
@@ -13,6 +13,7 @@ http = "0.2"
 hyper = { version = "0.14", features = [ "full" ] }
 lazy_static = "1.4"
 libc = "0.2"
+log = "0.4"
 nix = "0.19.1"
 serde = { version = "1.0", features = [] }
 serde_json = "1.0"
diff --git a/proxmox-rest-server/src/daemon.rs b/proxmox-rest-server/src/daemon.rs
new file mode 100644 (file)
index 0000000..5401e30
--- /dev/null
@@ -0,0 +1,378 @@
+//! Helpers for daemons/services.
+
+use std::ffi::CString;
+use std::future::Future;
+use std::io::{Read, Write};
+use std::os::raw::{c_char, c_uchar, c_int};
+use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
+use std::os::unix::ffi::OsStrExt;
+use std::panic::UnwindSafe;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+use std::path::PathBuf;
+
+use anyhow::{bail, format_err, Error};
+use futures::future::{self, Either};
+
+use proxmox::tools::io::{ReadExt, WriteExt};
+use proxmox::tools::fd::Fd;
+
+use crate::fd_change_cloexec;
+
+#[link(name = "systemd")]
+extern "C" {
+    fn sd_journal_stream_fd(identifier: *const c_uchar, priority: c_int, level_prefix: c_int) -> c_int;
+}
+
+// Unfortunately FnBox is nightly-only and Box<FnOnce> is unusable, so just use Box<Fn>...
+pub type BoxedStoreFunc = Box<dyn FnMut() -> Result<String, Error> + UnwindSafe + Send>;
+
+/// Helper trait to "store" something in the environment to be re-used after re-executing the
+/// service on a reload.
+pub trait Reloadable: Sized {
+    fn restore(var: &str) -> Result<Self, Error>;
+    fn get_store_func(&self) -> Result<BoxedStoreFunc, Error>;
+}
+
+/// Manages things to be stored and reloaded upon reexec.
+/// Anything which should be restorable should be instantiated via this struct's `restore` method,
+#[derive(Default)]
+pub struct Reloader {
+    pre_exec: Vec<PreExecEntry>,
+    self_exe: PathBuf,
+}
+
+// Currently we only need environment variables for storage, but in theory we could also add
+// variants which need temporary files or pipes...
+struct PreExecEntry {
+    name: &'static str, // Feel free to change to String if necessary...
+    store_fn: BoxedStoreFunc,
+}
+
+impl Reloader {
+    pub fn new() -> Result<Self, Error> {
+        Ok(Self {
+            pre_exec: Vec::new(),
+
+            // Get the path to our executable as PathBuf
+            self_exe: std::fs::read_link("/proc/self/exe")?,
+        })
+    }
+
+    /// Restore an object from an environment variable of the given name, or, if none exists, uses
+    /// the function provided in the `or_create` parameter to instantiate the new "first" instance.
+    ///
+    /// Values created via this method will be remembered for later re-execution.
+    pub async fn restore<T, F, U>(&mut self, name: &'static str, or_create: F) -> Result<T, Error>
+    where
+        T: Reloadable,
+        F: FnOnce() -> U,
+        U: Future<Output = Result<T, Error>>,
+    {
+        let res = match std::env::var(name) {
+            Ok(varstr) => T::restore(&varstr)?,
+            Err(std::env::VarError::NotPresent) => or_create().await?,
+            Err(_) => bail!("variable {} has invalid value", name),
+        };
+
+        self.pre_exec.push(PreExecEntry {
+            name,
+            store_fn: res.get_store_func()?,
+        });
+        Ok(res)
+    }
+
+    fn pre_exec(self) -> Result<(), Error> {
+        for mut item in self.pre_exec {
+            std::env::set_var(item.name, (item.store_fn)()?);
+        }
+        Ok(())
+    }
+
+    pub fn fork_restart(self) -> Result<(), Error> {
+        // Get our parameters as Vec<CString>
+        let args = std::env::args_os();
+        let mut new_args = Vec::with_capacity(args.len());
+        for arg in args {
+            new_args.push(CString::new(arg.as_bytes())?);
+        }
+
+        // Synchronisation pipe:
+        let (pold, pnew) = super::socketpair()?;
+
+        // Start ourselves in the background:
+        use nix::unistd::{fork, ForkResult};
+        match unsafe { fork() } {
+            Ok(ForkResult::Child) => {
+                // Double fork so systemd can supervise us without nagging...
+                match unsafe { fork() } {
+                    Ok(ForkResult::Child) => {
+                        std::mem::drop(pold);
+                        // At this point we call pre-exec helpers. We must be certain that if they fail for
+                        // whatever reason we can still call `_exit()`, so use catch_unwind.
+                        match std::panic::catch_unwind(move || {
+                            let mut pnew = unsafe {
+                                std::fs::File::from_raw_fd(pnew.into_raw_fd())
+                            };
+                            let pid = nix::unistd::Pid::this();
+                            if let Err(e) = unsafe { pnew.write_host_value(pid.as_raw()) } {
+                                log::error!("failed to send new server PID to parent: {}", e);
+                                unsafe {
+                                    libc::_exit(-1);
+                                }
+                            }
+
+                            let mut ok = [0u8];
+                            if let Err(e) = pnew.read_exact(&mut ok) {
+                                log::error!("parent vanished before notifying systemd: {}", e);
+                                unsafe {
+                                    libc::_exit(-1);
+                                }
+                            }
+                            assert_eq!(ok[0], 1, "reload handshake should have sent a 1 byte");
+
+                            std::mem::drop(pnew);
+
+                            // Try to reopen STDOUT/STDERR journald streams to get correct PID in logs
+                            let ident = CString::new(self.self_exe.file_name().unwrap().as_bytes()).unwrap();
+                            let ident = ident.as_bytes();
+                            let fd = unsafe { sd_journal_stream_fd(ident.as_ptr(), libc::LOG_INFO, 1) };
+                            if fd >= 0 && fd != 1 {
+                                let fd = proxmox::tools::fd::Fd(fd); // add drop handler
+                                nix::unistd::dup2(fd.as_raw_fd(), 1)?;
+                            } else {
+                                log::error!("failed to update STDOUT journal redirection ({})", fd);
+                            }
+                            let fd = unsafe { sd_journal_stream_fd(ident.as_ptr(), libc::LOG_ERR, 1) };
+                            if fd >= 0 && fd != 2 {
+                                let fd = proxmox::tools::fd::Fd(fd); // add drop handler
+                                nix::unistd::dup2(fd.as_raw_fd(), 2)?;
+                            } else {
+                                log::error!("failed to update STDERR journal redirection ({})", fd);
+                            }
+
+                            self.do_reexec(new_args)
+                        })
+                        {
+                            Ok(Ok(())) => eprintln!("do_reexec returned!"),
+                            Ok(Err(err)) => eprintln!("do_reexec failed: {}", err),
+                            Err(_) => eprintln!("panic in re-exec"),
+                        }
+                    }
+                    Ok(ForkResult::Parent { child }) => {
+                        std::mem::drop((pold, pnew));
+                        log::debug!("forked off a new server (second pid: {})", child);
+                    }
+                    Err(e) => log::error!("fork() failed, restart delayed: {}", e),
+                }
+                // No matter how we managed to get here, this is the time where we bail out quickly:
+                unsafe {
+                    libc::_exit(-1)
+                }
+            }
+            Ok(ForkResult::Parent { child }) => {
+                log::debug!("forked off a new server (first pid: {}), waiting for 2nd pid", child);
+                std::mem::drop(pnew);
+                let mut pold = unsafe {
+                    std::fs::File::from_raw_fd(pold.into_raw_fd())
+                };
+                let child = nix::unistd::Pid::from_raw(match unsafe { pold.read_le_value() } {
+                    Ok(v) => v,
+                    Err(e) => {
+                        log::error!("failed to receive pid of double-forked child process: {}", e);
+                        // systemd will complain but won't kill the service...
+                        return Ok(());
+                    }
+                });
+
+                if let Err(e) = systemd_notify(SystemdNotify::MainPid(child)) {
+                    log::error!("failed to notify systemd about the new main pid: {}", e);
+                }
+
+                // notify child that it is now the new main process:
+                if let Err(e) = pold.write_all(&[1u8]) {
+                    log::error!("child vanished during reload: {}", e);
+                }
+
+                Ok(())
+            }
+            Err(e) => {
+                log::error!("fork() failed, restart delayed: {}", e);
+                Ok(())
+            }
+        }
+    }
+
+    fn do_reexec(self, args: Vec<CString>) -> Result<(), Error> {
+        let exe = CString::new(self.self_exe.as_os_str().as_bytes())?;
+        self.pre_exec()?;
+        nix::unistd::setsid()?;
+        let args: Vec<&std::ffi::CStr> = args.iter().map(|s| s.as_ref()).collect();
+        nix::unistd::execvp(&exe, &args)?;
+        panic!("exec misbehaved");
+    }
+}
+
+// For now all we need to do is store and reuse a tcp listening socket:
+impl Reloadable for tokio::net::TcpListener {
+    // NOTE: The socket must not be closed when the store-function is called:
+    // FIXME: We could become "independent" of the TcpListener and its reference to the file
+    // descriptor by `dup()`ing it (and check if the listener still exists via kcmp()?)
+    fn get_store_func(&self) -> Result<BoxedStoreFunc, Error> {
+        let mut fd_opt = Some(Fd(
+            nix::fcntl::fcntl(self.as_raw_fd(), nix::fcntl::FcntlArg::F_DUPFD_CLOEXEC(0))?
+        ));
+        Ok(Box::new(move || {
+            let fd = fd_opt.take().unwrap();
+            fd_change_cloexec(fd.as_raw_fd(), false)?;
+            Ok(fd.into_raw_fd().to_string())
+        }))
+    }
+
+    fn restore(var: &str) -> Result<Self, Error> {
+        let fd = var.parse::<u32>()
+            .map_err(|e| format_err!("invalid file descriptor: {}", e))?
+            as RawFd;
+        fd_change_cloexec(fd, true)?;
+        Ok(Self::from_std(
+            unsafe { std::net::TcpListener::from_raw_fd(fd) },
+        )?)
+    }
+}
+
+pub struct NotifyReady;
+
+impl Future for NotifyReady {
+    type Output = Result<(), Error>;
+
+    fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), Error>> {
+        systemd_notify(SystemdNotify::Ready)?;
+        Poll::Ready(Ok(()))
+    }
+}
+
+/// This creates a future representing a daemon which reloads itself when receiving a SIGHUP.
+/// If this is started regularly, a listening socket is created. In this case, the file descriptor
+/// number will be remembered in `PROXMOX_BACKUP_LISTEN_FD`.
+/// If the variable already exists, its contents will instead be used to restore the listening
+/// socket.  The finished listening socket is then passed to the `create_service` function which
+/// can be used to setup the TLS and the HTTP daemon.
+pub async fn create_daemon<F, S>(
+    address: std::net::SocketAddr,
+    create_service: F,
+    service_name: &str,
+) -> Result<(), Error>
+where
+    F: FnOnce(tokio::net::TcpListener, NotifyReady) -> Result<S, Error>,
+    S: Future<Output = ()> + Unpin,
+{
+    let mut reloader = Reloader::new()?;
+
+    let listener: tokio::net::TcpListener = reloader.restore(
+        "PROXMOX_BACKUP_LISTEN_FD",
+        move || async move { Ok(tokio::net::TcpListener::bind(&address).await?) },
+    ).await?;
+
+    let server_future = create_service(listener, NotifyReady)?;
+    let shutdown_future = crate::shutdown_future();
+
+    let finish_future = match future::select(server_future, shutdown_future).await {
+        Either::Left((_, _)) => {
+            crate::request_shutdown(); // make sure we are in shutdown mode
+            None
+        }
+        Either::Right((_, server_future)) => Some(server_future),
+    };
+
+    let mut reloader = Some(reloader);
+
+    if crate::is_reload_request() {
+        log::info!("daemon reload...");
+        if let Err(e) = systemd_notify(SystemdNotify::Reloading) {
+            log::error!("failed to notify systemd about the state change: {}", e);
+        }
+        wait_service_is_state(service_name, "reloading").await?;
+        if let Err(e) = reloader.take().unwrap().fork_restart() {
+            log::error!("error during reload: {}", e);
+            let _ = systemd_notify(SystemdNotify::Status("error during reload".to_string()));
+        }
+    } else {
+        log::info!("daemon shutting down...");
+    }
+
+    if let Some(future) = finish_future {
+        future.await;
+    }
+
+    // FIXME: this is a hack, replace with sd_notify_barrier when available
+    if crate::is_reload_request() {
+        wait_service_is_not_state(service_name, "reloading").await?;
+    }
+
+    log::info!("daemon shut down...");
+    Ok(())
+}
+
+// hack, do not use if unsure!
+async fn get_service_state(service: &str) -> Result<String, Error> {
+    let text = match tokio::process::Command::new("systemctl")
+        .args(&["is-active", service])
+        .output()
+        .await
+    {
+        Ok(output) => match String::from_utf8(output.stdout) {
+            Ok(text) => text,
+            Err(err) => bail!("output of 'systemctl is-active' not valid UTF-8 - {}", err),
+        },
+        Err(err) => bail!("executing 'systemctl is-active' failed - {}", err),
+    };
+
+    Ok(text.trim().trim_start().to_string())
+}
+
+async fn wait_service_is_state(service: &str, state: &str) -> Result<(), Error> {
+    tokio::time::sleep(std::time::Duration::new(1, 0)).await;
+    while get_service_state(service).await? != state {
+        tokio::time::sleep(std::time::Duration::new(5, 0)).await;
+    }
+    Ok(())
+}
+
+async fn wait_service_is_not_state(service: &str, state: &str) -> Result<(), Error> {
+    tokio::time::sleep(std::time::Duration::new(1, 0)).await;
+    while get_service_state(service).await? == state {
+        tokio::time::sleep(std::time::Duration::new(5, 0)).await;
+    }
+    Ok(())
+}
+
+#[link(name = "systemd")]
+extern "C" {
+    fn sd_notify(unset_environment: c_int, state: *const c_char) -> c_int;
+}
+
+pub enum SystemdNotify {
+    Ready,
+    Reloading,
+    Stopping,
+    Status(String),
+    MainPid(nix::unistd::Pid),
+}
+
+pub fn systemd_notify(state: SystemdNotify) -> Result<(), Error> {
+    let message = match state {
+        SystemdNotify::Ready => CString::new("READY=1"),
+        SystemdNotify::Reloading => CString::new("RELOADING=1"),
+        SystemdNotify::Stopping => CString::new("STOPPING=1"),
+        SystemdNotify::Status(msg) => CString::new(format!("STATUS={}", msg)),
+        SystemdNotify::MainPid(pid) => CString::new(format!("MAINPID={}", pid)),
+    }?;
+    let rc = unsafe { sd_notify(0, message.as_ptr()) };
+    if rc < 0 {
+        bail!(
+            "systemd_notify failed: {}",
+            std::io::Error::from_raw_os_error(-rc),
+        );
+    }
+    Ok(())
+}
index 38dd610c0272e994af3538d6714df14fea9fdf24..21a91115968d288a7bc22aa769c2cd14698f2fdc 100644 (file)
@@ -1,4 +1,10 @@
-use anyhow::{bail, Error};
+use std::os::unix::io::RawFd;
+
+use anyhow::{bail, format_err, Error};
+
+use proxmox::tools::fd::Fd;
+
+pub mod daemon;
 
 mod state;
 pub use state::*;
@@ -52,3 +58,26 @@ pub fn fail_on_shutdown() -> Result<(), Error> {
     Ok(())
 }
 
+/// Helper to set/clear the FD_CLOEXEC flag on file descriptors
+pub fn fd_change_cloexec(fd: RawFd, on: bool) -> Result<(), Error> {
+    use nix::fcntl::{fcntl, FdFlag, F_GETFD, F_SETFD};
+    let mut flags = FdFlag::from_bits(fcntl(fd, F_GETFD)?)
+        .ok_or_else(|| format_err!("unhandled file flags"))?; // nix crate is stupid this way...
+    flags.set(FdFlag::FD_CLOEXEC, on);
+    fcntl(fd, F_SETFD(flags))?;
+    Ok(())
+}
+
+/// safe wrapper for `nix::sys::socket::socketpair` defaulting to `O_CLOEXEC` and guarding the file
+/// descriptors.
+pub fn socketpair() -> Result<(Fd, Fd), Error> {
+    use nix::sys::socket;
+    let (pa, pb) = socket::socketpair(
+        socket::AddressFamily::Unix,
+        socket::SockType::Stream,
+        None,
+        socket::SockFlag::SOCK_CLOEXEC,
+    )?;
+    Ok((Fd(pa), Fd(pb)))
+}
+
index ecc1e2e083768d931ce00c53e9c9a21ec106ea77..9b31d59525fb8f7d12a5b54eb22e4366df50bde8 100644 (file)
@@ -151,7 +151,7 @@ async fn termproxy(cmd: Option<String>, rpcenv: &mut dyn RpcEnvironment) -> Resu
         move |worker| async move {
             // move inside the worker so that it survives and does not close the port
             // remove CLOEXEC from listenere so that we can reuse it in termproxy
-            tools::fd_change_cloexec(listener.as_raw_fd(), false)?;
+            proxmox_rest_server::fd_change_cloexec(listener.as_raw_fd(), false)?;
 
             let mut arguments: Vec<&str> = Vec::new();
             let fd_string = listener.as_raw_fd().to_string();
index 452bbc3a04e33a803d77e2c8e37015ae4a9b8804..17b6f1844dea3ce3b6e12c700786f2fed0345018 100644 (file)
@@ -13,7 +13,8 @@ use proxmox_backup::server::{
     auth::default_api_auth,
     rest::*,
 };
-use proxmox_backup::tools::daemon;
+use proxmox_rest_server::daemon;
+
 use proxmox_backup::auth_helpers::*;
 use proxmox_backup::config;
 
index de534a659d397313d4b4eb1746c42623d32b54c5..d4ac2a8566c7043b571a4483bda0f717c409ceb4 100644 (file)
@@ -39,11 +39,12 @@ use pbs_api_types::{
     PruneOptions,
 };
 
+use proxmox_rest_server::daemon;
+
 use proxmox_backup::server;
 use proxmox_backup::auth_helpers::*;
 use proxmox_backup::tools::{
     PROXMOX_BACKUP_TCP_KEEPALIVE_TIME,
-    daemon,
     disks::{
         DiskManage,
         zfs_pool_stats,
diff --git a/src/tools/daemon.rs b/src/tools/daemon.rs
deleted file mode 100644 (file)
index 1291601..0000000
+++ /dev/null
@@ -1,377 +0,0 @@
-//! Helpers for daemons/services.
-
-use std::ffi::CString;
-use std::future::Future;
-use std::io::{Read, Write};
-use std::os::raw::{c_char, c_uchar, c_int};
-use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
-use std::os::unix::ffi::OsStrExt;
-use std::panic::UnwindSafe;
-use std::pin::Pin;
-use std::task::{Context, Poll};
-use std::path::PathBuf;
-
-use anyhow::{bail, format_err, Error};
-use futures::future::{self, Either};
-
-use proxmox::tools::io::{ReadExt, WriteExt};
-
-use crate::tools::{fd_change_cloexec, self};
-
-#[link(name = "systemd")]
-extern "C" {
-    fn sd_journal_stream_fd(identifier: *const c_uchar, priority: c_int, level_prefix: c_int) -> c_int;
-}
-
-// Unfortunately FnBox is nightly-only and Box<FnOnce> is unusable, so just use Box<Fn>...
-pub type BoxedStoreFunc = Box<dyn FnMut() -> Result<String, Error> + UnwindSafe + Send>;
-
-/// Helper trait to "store" something in the environment to be re-used after re-executing the
-/// service on a reload.
-pub trait Reloadable: Sized {
-    fn restore(var: &str) -> Result<Self, Error>;
-    fn get_store_func(&self) -> Result<BoxedStoreFunc, Error>;
-}
-
-/// Manages things to be stored and reloaded upon reexec.
-/// Anything which should be restorable should be instantiated via this struct's `restore` method,
-#[derive(Default)]
-pub struct Reloader {
-    pre_exec: Vec<PreExecEntry>,
-    self_exe: PathBuf,
-}
-
-// Currently we only need environment variables for storage, but in theory we could also add
-// variants which need temporary files or pipes...
-struct PreExecEntry {
-    name: &'static str, // Feel free to change to String if necessary...
-    store_fn: BoxedStoreFunc,
-}
-
-impl Reloader {
-    pub fn new() -> Result<Self, Error> {
-        Ok(Self {
-            pre_exec: Vec::new(),
-
-            // Get the path to our executable as PathBuf
-            self_exe: std::fs::read_link("/proc/self/exe")?,
-        })
-    }
-
-    /// Restore an object from an environment variable of the given name, or, if none exists, uses
-    /// the function provided in the `or_create` parameter to instantiate the new "first" instance.
-    ///
-    /// Values created via this method will be remembered for later re-execution.
-    pub async fn restore<T, F, U>(&mut self, name: &'static str, or_create: F) -> Result<T, Error>
-    where
-        T: Reloadable,
-        F: FnOnce() -> U,
-        U: Future<Output = Result<T, Error>>,
-    {
-        let res = match std::env::var(name) {
-            Ok(varstr) => T::restore(&varstr)?,
-            Err(std::env::VarError::NotPresent) => or_create().await?,
-            Err(_) => bail!("variable {} has invalid value", name),
-        };
-
-        self.pre_exec.push(PreExecEntry {
-            name,
-            store_fn: res.get_store_func()?,
-        });
-        Ok(res)
-    }
-
-    fn pre_exec(self) -> Result<(), Error> {
-        for mut item in self.pre_exec {
-            std::env::set_var(item.name, (item.store_fn)()?);
-        }
-        Ok(())
-    }
-
-    pub fn fork_restart(self) -> Result<(), Error> {
-        // Get our parameters as Vec<CString>
-        let args = std::env::args_os();
-        let mut new_args = Vec::with_capacity(args.len());
-        for arg in args {
-            new_args.push(CString::new(arg.as_bytes())?);
-        }
-
-        // Synchronisation pipe:
-        let (pold, pnew) = super::socketpair()?;
-
-        // Start ourselves in the background:
-        use nix::unistd::{fork, ForkResult};
-        match unsafe { fork() } {
-            Ok(ForkResult::Child) => {
-                // Double fork so systemd can supervise us without nagging...
-                match unsafe { fork() } {
-                    Ok(ForkResult::Child) => {
-                        std::mem::drop(pold);
-                        // At this point we call pre-exec helpers. We must be certain that if they fail for
-                        // whatever reason we can still call `_exit()`, so use catch_unwind.
-                        match std::panic::catch_unwind(move || {
-                            let mut pnew = unsafe {
-                                std::fs::File::from_raw_fd(pnew.into_raw_fd())
-                            };
-                            let pid = nix::unistd::Pid::this();
-                            if let Err(e) = unsafe { pnew.write_host_value(pid.as_raw()) } {
-                                log::error!("failed to send new server PID to parent: {}", e);
-                                unsafe {
-                                    libc::_exit(-1);
-                                }
-                            }
-
-                            let mut ok = [0u8];
-                            if let Err(e) = pnew.read_exact(&mut ok) {
-                                log::error!("parent vanished before notifying systemd: {}", e);
-                                unsafe {
-                                    libc::_exit(-1);
-                                }
-                            }
-                            assert_eq!(ok[0], 1, "reload handshake should have sent a 1 byte");
-
-                            std::mem::drop(pnew);
-
-                            // Try to reopen STDOUT/STDERR journald streams to get correct PID in logs
-                            let ident = CString::new(self.self_exe.file_name().unwrap().as_bytes()).unwrap();
-                            let ident = ident.as_bytes();
-                            let fd = unsafe { sd_journal_stream_fd(ident.as_ptr(), libc::LOG_INFO, 1) };
-                            if fd >= 0 && fd != 1 {
-                                let fd = proxmox::tools::fd::Fd(fd); // add drop handler
-                                nix::unistd::dup2(fd.as_raw_fd(), 1)?;
-                            } else {
-                                log::error!("failed to update STDOUT journal redirection ({})", fd);
-                            }
-                            let fd = unsafe { sd_journal_stream_fd(ident.as_ptr(), libc::LOG_ERR, 1) };
-                            if fd >= 0 && fd != 2 {
-                                let fd = proxmox::tools::fd::Fd(fd); // add drop handler
-                                nix::unistd::dup2(fd.as_raw_fd(), 2)?;
-                            } else {
-                                log::error!("failed to update STDERR journal redirection ({})", fd);
-                            }
-
-                            self.do_reexec(new_args)
-                        })
-                        {
-                            Ok(Ok(())) => eprintln!("do_reexec returned!"),
-                            Ok(Err(err)) => eprintln!("do_reexec failed: {}", err),
-                            Err(_) => eprintln!("panic in re-exec"),
-                        }
-                    }
-                    Ok(ForkResult::Parent { child }) => {
-                        std::mem::drop((pold, pnew));
-                        log::debug!("forked off a new server (second pid: {})", child);
-                    }
-                    Err(e) => log::error!("fork() failed, restart delayed: {}", e),
-                }
-                // No matter how we managed to get here, this is the time where we bail out quickly:
-                unsafe {
-                    libc::_exit(-1)
-                }
-            }
-            Ok(ForkResult::Parent { child }) => {
-                log::debug!("forked off a new server (first pid: {}), waiting for 2nd pid", child);
-                std::mem::drop(pnew);
-                let mut pold = unsafe {
-                    std::fs::File::from_raw_fd(pold.into_raw_fd())
-                };
-                let child = nix::unistd::Pid::from_raw(match unsafe { pold.read_le_value() } {
-                    Ok(v) => v,
-                    Err(e) => {
-                        log::error!("failed to receive pid of double-forked child process: {}", e);
-                        // systemd will complain but won't kill the service...
-                        return Ok(());
-                    }
-                });
-
-                if let Err(e) = systemd_notify(SystemdNotify::MainPid(child)) {
-                    log::error!("failed to notify systemd about the new main pid: {}", e);
-                }
-
-                // notify child that it is now the new main process:
-                if let Err(e) = pold.write_all(&[1u8]) {
-                    log::error!("child vanished during reload: {}", e);
-                }
-
-                Ok(())
-            }
-            Err(e) => {
-                log::error!("fork() failed, restart delayed: {}", e);
-                Ok(())
-            }
-        }
-    }
-
-    fn do_reexec(self, args: Vec<CString>) -> Result<(), Error> {
-        let exe = CString::new(self.self_exe.as_os_str().as_bytes())?;
-        self.pre_exec()?;
-        nix::unistd::setsid()?;
-        let args: Vec<&std::ffi::CStr> = args.iter().map(|s| s.as_ref()).collect();
-        nix::unistd::execvp(&exe, &args)?;
-        panic!("exec misbehaved");
-    }
-}
-
-// For now all we need to do is store and reuse a tcp listening socket:
-impl Reloadable for tokio::net::TcpListener {
-    // NOTE: The socket must not be closed when the store-function is called:
-    // FIXME: We could become "independent" of the TcpListener and its reference to the file
-    // descriptor by `dup()`ing it (and check if the listener still exists via kcmp()?)
-    fn get_store_func(&self) -> Result<BoxedStoreFunc, Error> {
-        let mut fd_opt = Some(tools::Fd(
-            nix::fcntl::fcntl(self.as_raw_fd(), nix::fcntl::FcntlArg::F_DUPFD_CLOEXEC(0))?
-        ));
-        Ok(Box::new(move || {
-            let fd = fd_opt.take().unwrap();
-            fd_change_cloexec(fd.as_raw_fd(), false)?;
-            Ok(fd.into_raw_fd().to_string())
-        }))
-    }
-
-    fn restore(var: &str) -> Result<Self, Error> {
-        let fd = var.parse::<u32>()
-            .map_err(|e| format_err!("invalid file descriptor: {}", e))?
-            as RawFd;
-        fd_change_cloexec(fd, true)?;
-        Ok(Self::from_std(
-            unsafe { std::net::TcpListener::from_raw_fd(fd) },
-        )?)
-    }
-}
-
-pub struct NotifyReady;
-
-impl Future for NotifyReady {
-    type Output = Result<(), Error>;
-
-    fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), Error>> {
-        systemd_notify(SystemdNotify::Ready)?;
-        Poll::Ready(Ok(()))
-    }
-}
-
-/// This creates a future representing a daemon which reloads itself when receiving a SIGHUP.
-/// If this is started regularly, a listening socket is created. In this case, the file descriptor
-/// number will be remembered in `PROXMOX_BACKUP_LISTEN_FD`.
-/// If the variable already exists, its contents will instead be used to restore the listening
-/// socket.  The finished listening socket is then passed to the `create_service` function which
-/// can be used to setup the TLS and the HTTP daemon.
-pub async fn create_daemon<F, S>(
-    address: std::net::SocketAddr,
-    create_service: F,
-    service_name: &str,
-) -> Result<(), Error>
-where
-    F: FnOnce(tokio::net::TcpListener, NotifyReady) -> Result<S, Error>,
-    S: Future<Output = ()> + Unpin,
-{
-    let mut reloader = Reloader::new()?;
-
-    let listener: tokio::net::TcpListener = reloader.restore(
-        "PROXMOX_BACKUP_LISTEN_FD",
-        move || async move { Ok(tokio::net::TcpListener::bind(&address).await?) },
-    ).await?;
-
-    let server_future = create_service(listener, NotifyReady)?;
-    let shutdown_future = proxmox_rest_server::shutdown_future();
-
-    let finish_future = match future::select(server_future, shutdown_future).await {
-        Either::Left((_, _)) => {
-            proxmox_rest_server::request_shutdown(); // make sure we are in shutdown mode
-            None
-        }
-        Either::Right((_, server_future)) => Some(server_future),
-    };
-
-    let mut reloader = Some(reloader);
-
-    if proxmox_rest_server::is_reload_request() {
-        log::info!("daemon reload...");
-        if let Err(e) = systemd_notify(SystemdNotify::Reloading) {
-            log::error!("failed to notify systemd about the state change: {}", e);
-        }
-        wait_service_is_state(service_name, "reloading").await?;
-        if let Err(e) = reloader.take().unwrap().fork_restart() {
-            log::error!("error during reload: {}", e);
-            let _ = systemd_notify(SystemdNotify::Status("error during reload".to_string()));
-        }
-    } else {
-        log::info!("daemon shutting down...");
-    }
-
-    if let Some(future) = finish_future {
-        future.await;
-    }
-
-    // FIXME: this is a hack, replace with sd_notify_barrier when available
-    if proxmox_rest_server::is_reload_request() {
-        wait_service_is_not_state(service_name, "reloading").await?;
-    }
-
-    log::info!("daemon shut down...");
-    Ok(())
-}
-
-// hack, do not use if unsure!
-async fn get_service_state(service: &str) -> Result<String, Error> {
-    let text = match tokio::process::Command::new("systemctl")
-        .args(&["is-active", service])
-        .output()
-        .await
-    {
-        Ok(output) => match String::from_utf8(output.stdout) {
-            Ok(text) => text,
-            Err(err) => bail!("output of 'systemctl is-active' not valid UTF-8 - {}", err),
-        },
-        Err(err) => bail!("executing 'systemctl is-active' failed - {}", err),
-    };
-
-    Ok(text.trim().trim_start().to_string())
-}
-
-async fn wait_service_is_state(service: &str, state: &str) -> Result<(), Error> {
-    tokio::time::sleep(std::time::Duration::new(1, 0)).await;
-    while get_service_state(service).await? != state {
-        tokio::time::sleep(std::time::Duration::new(5, 0)).await;
-    }
-    Ok(())
-}
-
-async fn wait_service_is_not_state(service: &str, state: &str) -> Result<(), Error> {
-    tokio::time::sleep(std::time::Duration::new(1, 0)).await;
-    while get_service_state(service).await? == state {
-        tokio::time::sleep(std::time::Duration::new(5, 0)).await;
-    }
-    Ok(())
-}
-
-#[link(name = "systemd")]
-extern "C" {
-    fn sd_notify(unset_environment: c_int, state: *const c_char) -> c_int;
-}
-
-pub enum SystemdNotify {
-    Ready,
-    Reloading,
-    Stopping,
-    Status(String),
-    MainPid(nix::unistd::Pid),
-}
-
-pub fn systemd_notify(state: SystemdNotify) -> Result<(), Error> {
-    let message = match state {
-        SystemdNotify::Ready => CString::new("READY=1"),
-        SystemdNotify::Reloading => CString::new("RELOADING=1"),
-        SystemdNotify::Stopping => CString::new("STOPPING=1"),
-        SystemdNotify::Status(msg) => CString::new(format!("STATUS={}", msg)),
-        SystemdNotify::MainPid(pid) => CString::new(format!("MAINPID={}", pid)),
-    }?;
-    let rc = unsafe { sd_notify(0, message.as_ptr()) };
-    if rc < 0 {
-        bail!(
-            "systemd_notify failed: {}",
-            std::io::Error::from_raw_os_error(-rc),
-        );
-    }
-    Ok(())
-}
index f8b363f549958af0cf4160c3ae47479550b1cebf..8fd441b5bb699e8a67b29ca3fd852441517e4ded 100644 (file)
@@ -2,13 +2,10 @@
 //!
 //! This is a collection of small and useful tools.
 use std::any::Any;
-use std::os::unix::io::RawFd;
 
 use anyhow::{bail, format_err, Error};
 use openssl::hash::{hash, DigestBytes, MessageDigest};
 
-pub use proxmox::tools::fd::Fd;
-
 use proxmox_http::{
     client::SimpleHttp,
     client::SimpleHttpOptions,
@@ -19,7 +16,6 @@ pub mod apt;
 pub mod async_io;
 pub mod compression;
 pub mod config;
-pub mod daemon;
 pub mod disks;
 
 pub mod serde_filter;
@@ -111,29 +107,6 @@ pub fn normalize_uri_path(path: &str) -> Result<(String, Vec<&str>), Error> {
     Ok((path, components))
 }
 
-pub fn fd_change_cloexec(fd: RawFd, on: bool) -> Result<(), Error> {
-    use nix::fcntl::{fcntl, FdFlag, F_GETFD, F_SETFD};
-    let mut flags = FdFlag::from_bits(fcntl(fd, F_GETFD)?)
-        .ok_or_else(|| format_err!("unhandled file flags"))?; // nix crate is stupid this way...
-    flags.set(FdFlag::FD_CLOEXEC, on);
-    fcntl(fd, F_SETFD(flags))?;
-    Ok(())
-}
-
-/// safe wrapper for `nix::sys::socket::socketpair` defaulting to `O_CLOEXEC` and guarding the file
-/// descriptors.
-pub fn socketpair() -> Result<(Fd, Fd), Error> {
-    use nix::sys::socket;
-    let (pa, pb) = socket::socketpair(
-        socket::AddressFamily::Unix,
-        socket::SockType::Stream,
-        None,
-        socket::SockFlag::SOCK_CLOEXEC,
-    )?;
-    Ok((Fd(pa), Fd(pb)))
-}
-
-
 /// An easy way to convert types to Any
 ///
 /// Mostly useful to downcast trait objects (see RpcEnvironment).