hyper = { version = "0.14", features = [ "full" ] }
lazy_static = "1.4"
libc = "0.2"
+log = "0.4"
nix = "0.19.1"
serde = { version = "1.0", features = [] }
serde_json = "1.0"
--- /dev/null
+//! Helpers for daemons/services.
+
+use std::ffi::CString;
+use std::future::Future;
+use std::io::{Read, Write};
+use std::os::raw::{c_char, c_uchar, c_int};
+use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
+use std::os::unix::ffi::OsStrExt;
+use std::panic::UnwindSafe;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+use std::path::PathBuf;
+
+use anyhow::{bail, format_err, Error};
+use futures::future::{self, Either};
+
+use proxmox::tools::io::{ReadExt, WriteExt};
+use proxmox::tools::fd::Fd;
+
+use crate::fd_change_cloexec;
+
+#[link(name = "systemd")]
+extern "C" {
+ fn sd_journal_stream_fd(identifier: *const c_uchar, priority: c_int, level_prefix: c_int) -> c_int;
+}
+
+// Unfortunately FnBox is nightly-only and Box<FnOnce> is unusable, so just use Box<Fn>...
+pub type BoxedStoreFunc = Box<dyn FnMut() -> Result<String, Error> + UnwindSafe + Send>;
+
+/// Helper trait to "store" something in the environment to be re-used after re-executing the
+/// service on a reload.
+pub trait Reloadable: Sized {
+ fn restore(var: &str) -> Result<Self, Error>;
+ fn get_store_func(&self) -> Result<BoxedStoreFunc, Error>;
+}
+
+/// Manages things to be stored and reloaded upon reexec.
+/// Anything which should be restorable should be instantiated via this struct's `restore` method,
+#[derive(Default)]
+pub struct Reloader {
+ pre_exec: Vec<PreExecEntry>,
+ self_exe: PathBuf,
+}
+
+// Currently we only need environment variables for storage, but in theory we could also add
+// variants which need temporary files or pipes...
+struct PreExecEntry {
+ name: &'static str, // Feel free to change to String if necessary...
+ store_fn: BoxedStoreFunc,
+}
+
+impl Reloader {
+ pub fn new() -> Result<Self, Error> {
+ Ok(Self {
+ pre_exec: Vec::new(),
+
+ // Get the path to our executable as PathBuf
+ self_exe: std::fs::read_link("/proc/self/exe")?,
+ })
+ }
+
+ /// Restore an object from an environment variable of the given name, or, if none exists, uses
+ /// the function provided in the `or_create` parameter to instantiate the new "first" instance.
+ ///
+ /// Values created via this method will be remembered for later re-execution.
+ pub async fn restore<T, F, U>(&mut self, name: &'static str, or_create: F) -> Result<T, Error>
+ where
+ T: Reloadable,
+ F: FnOnce() -> U,
+ U: Future<Output = Result<T, Error>>,
+ {
+ let res = match std::env::var(name) {
+ Ok(varstr) => T::restore(&varstr)?,
+ Err(std::env::VarError::NotPresent) => or_create().await?,
+ Err(_) => bail!("variable {} has invalid value", name),
+ };
+
+ self.pre_exec.push(PreExecEntry {
+ name,
+ store_fn: res.get_store_func()?,
+ });
+ Ok(res)
+ }
+
+ fn pre_exec(self) -> Result<(), Error> {
+ for mut item in self.pre_exec {
+ std::env::set_var(item.name, (item.store_fn)()?);
+ }
+ Ok(())
+ }
+
+ pub fn fork_restart(self) -> Result<(), Error> {
+ // Get our parameters as Vec<CString>
+ let args = std::env::args_os();
+ let mut new_args = Vec::with_capacity(args.len());
+ for arg in args {
+ new_args.push(CString::new(arg.as_bytes())?);
+ }
+
+ // Synchronisation pipe:
+ let (pold, pnew) = super::socketpair()?;
+
+ // Start ourselves in the background:
+ use nix::unistd::{fork, ForkResult};
+ match unsafe { fork() } {
+ Ok(ForkResult::Child) => {
+ // Double fork so systemd can supervise us without nagging...
+ match unsafe { fork() } {
+ Ok(ForkResult::Child) => {
+ std::mem::drop(pold);
+ // At this point we call pre-exec helpers. We must be certain that if they fail for
+ // whatever reason we can still call `_exit()`, so use catch_unwind.
+ match std::panic::catch_unwind(move || {
+ let mut pnew = unsafe {
+ std::fs::File::from_raw_fd(pnew.into_raw_fd())
+ };
+ let pid = nix::unistd::Pid::this();
+ if let Err(e) = unsafe { pnew.write_host_value(pid.as_raw()) } {
+ log::error!("failed to send new server PID to parent: {}", e);
+ unsafe {
+ libc::_exit(-1);
+ }
+ }
+
+ let mut ok = [0u8];
+ if let Err(e) = pnew.read_exact(&mut ok) {
+ log::error!("parent vanished before notifying systemd: {}", e);
+ unsafe {
+ libc::_exit(-1);
+ }
+ }
+ assert_eq!(ok[0], 1, "reload handshake should have sent a 1 byte");
+
+ std::mem::drop(pnew);
+
+ // Try to reopen STDOUT/STDERR journald streams to get correct PID in logs
+ let ident = CString::new(self.self_exe.file_name().unwrap().as_bytes()).unwrap();
+ let ident = ident.as_bytes();
+ let fd = unsafe { sd_journal_stream_fd(ident.as_ptr(), libc::LOG_INFO, 1) };
+ if fd >= 0 && fd != 1 {
+ let fd = proxmox::tools::fd::Fd(fd); // add drop handler
+ nix::unistd::dup2(fd.as_raw_fd(), 1)?;
+ } else {
+ log::error!("failed to update STDOUT journal redirection ({})", fd);
+ }
+ let fd = unsafe { sd_journal_stream_fd(ident.as_ptr(), libc::LOG_ERR, 1) };
+ if fd >= 0 && fd != 2 {
+ let fd = proxmox::tools::fd::Fd(fd); // add drop handler
+ nix::unistd::dup2(fd.as_raw_fd(), 2)?;
+ } else {
+ log::error!("failed to update STDERR journal redirection ({})", fd);
+ }
+
+ self.do_reexec(new_args)
+ })
+ {
+ Ok(Ok(())) => eprintln!("do_reexec returned!"),
+ Ok(Err(err)) => eprintln!("do_reexec failed: {}", err),
+ Err(_) => eprintln!("panic in re-exec"),
+ }
+ }
+ Ok(ForkResult::Parent { child }) => {
+ std::mem::drop((pold, pnew));
+ log::debug!("forked off a new server (second pid: {})", child);
+ }
+ Err(e) => log::error!("fork() failed, restart delayed: {}", e),
+ }
+ // No matter how we managed to get here, this is the time where we bail out quickly:
+ unsafe {
+ libc::_exit(-1)
+ }
+ }
+ Ok(ForkResult::Parent { child }) => {
+ log::debug!("forked off a new server (first pid: {}), waiting for 2nd pid", child);
+ std::mem::drop(pnew);
+ let mut pold = unsafe {
+ std::fs::File::from_raw_fd(pold.into_raw_fd())
+ };
+ let child = nix::unistd::Pid::from_raw(match unsafe { pold.read_le_value() } {
+ Ok(v) => v,
+ Err(e) => {
+ log::error!("failed to receive pid of double-forked child process: {}", e);
+ // systemd will complain but won't kill the service...
+ return Ok(());
+ }
+ });
+
+ if let Err(e) = systemd_notify(SystemdNotify::MainPid(child)) {
+ log::error!("failed to notify systemd about the new main pid: {}", e);
+ }
+
+ // notify child that it is now the new main process:
+ if let Err(e) = pold.write_all(&[1u8]) {
+ log::error!("child vanished during reload: {}", e);
+ }
+
+ Ok(())
+ }
+ Err(e) => {
+ log::error!("fork() failed, restart delayed: {}", e);
+ Ok(())
+ }
+ }
+ }
+
+ fn do_reexec(self, args: Vec<CString>) -> Result<(), Error> {
+ let exe = CString::new(self.self_exe.as_os_str().as_bytes())?;
+ self.pre_exec()?;
+ nix::unistd::setsid()?;
+ let args: Vec<&std::ffi::CStr> = args.iter().map(|s| s.as_ref()).collect();
+ nix::unistd::execvp(&exe, &args)?;
+ panic!("exec misbehaved");
+ }
+}
+
+// For now all we need to do is store and reuse a tcp listening socket:
+impl Reloadable for tokio::net::TcpListener {
+ // NOTE: The socket must not be closed when the store-function is called:
+ // FIXME: We could become "independent" of the TcpListener and its reference to the file
+ // descriptor by `dup()`ing it (and check if the listener still exists via kcmp()?)
+ fn get_store_func(&self) -> Result<BoxedStoreFunc, Error> {
+ let mut fd_opt = Some(Fd(
+ nix::fcntl::fcntl(self.as_raw_fd(), nix::fcntl::FcntlArg::F_DUPFD_CLOEXEC(0))?
+ ));
+ Ok(Box::new(move || {
+ let fd = fd_opt.take().unwrap();
+ fd_change_cloexec(fd.as_raw_fd(), false)?;
+ Ok(fd.into_raw_fd().to_string())
+ }))
+ }
+
+ fn restore(var: &str) -> Result<Self, Error> {
+ let fd = var.parse::<u32>()
+ .map_err(|e| format_err!("invalid file descriptor: {}", e))?
+ as RawFd;
+ fd_change_cloexec(fd, true)?;
+ Ok(Self::from_std(
+ unsafe { std::net::TcpListener::from_raw_fd(fd) },
+ )?)
+ }
+}
+
+pub struct NotifyReady;
+
+impl Future for NotifyReady {
+ type Output = Result<(), Error>;
+
+ fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), Error>> {
+ systemd_notify(SystemdNotify::Ready)?;
+ Poll::Ready(Ok(()))
+ }
+}
+
+/// This creates a future representing a daemon which reloads itself when receiving a SIGHUP.
+/// If this is started regularly, a listening socket is created. In this case, the file descriptor
+/// number will be remembered in `PROXMOX_BACKUP_LISTEN_FD`.
+/// If the variable already exists, its contents will instead be used to restore the listening
+/// socket. The finished listening socket is then passed to the `create_service` function which
+/// can be used to setup the TLS and the HTTP daemon.
+pub async fn create_daemon<F, S>(
+ address: std::net::SocketAddr,
+ create_service: F,
+ service_name: &str,
+) -> Result<(), Error>
+where
+ F: FnOnce(tokio::net::TcpListener, NotifyReady) -> Result<S, Error>,
+ S: Future<Output = ()> + Unpin,
+{
+ let mut reloader = Reloader::new()?;
+
+ let listener: tokio::net::TcpListener = reloader.restore(
+ "PROXMOX_BACKUP_LISTEN_FD",
+ move || async move { Ok(tokio::net::TcpListener::bind(&address).await?) },
+ ).await?;
+
+ let server_future = create_service(listener, NotifyReady)?;
+ let shutdown_future = crate::shutdown_future();
+
+ let finish_future = match future::select(server_future, shutdown_future).await {
+ Either::Left((_, _)) => {
+ crate::request_shutdown(); // make sure we are in shutdown mode
+ None
+ }
+ Either::Right((_, server_future)) => Some(server_future),
+ };
+
+ let mut reloader = Some(reloader);
+
+ if crate::is_reload_request() {
+ log::info!("daemon reload...");
+ if let Err(e) = systemd_notify(SystemdNotify::Reloading) {
+ log::error!("failed to notify systemd about the state change: {}", e);
+ }
+ wait_service_is_state(service_name, "reloading").await?;
+ if let Err(e) = reloader.take().unwrap().fork_restart() {
+ log::error!("error during reload: {}", e);
+ let _ = systemd_notify(SystemdNotify::Status("error during reload".to_string()));
+ }
+ } else {
+ log::info!("daemon shutting down...");
+ }
+
+ if let Some(future) = finish_future {
+ future.await;
+ }
+
+ // FIXME: this is a hack, replace with sd_notify_barrier when available
+ if crate::is_reload_request() {
+ wait_service_is_not_state(service_name, "reloading").await?;
+ }
+
+ log::info!("daemon shut down...");
+ Ok(())
+}
+
+// hack, do not use if unsure!
+async fn get_service_state(service: &str) -> Result<String, Error> {
+ let text = match tokio::process::Command::new("systemctl")
+ .args(&["is-active", service])
+ .output()
+ .await
+ {
+ Ok(output) => match String::from_utf8(output.stdout) {
+ Ok(text) => text,
+ Err(err) => bail!("output of 'systemctl is-active' not valid UTF-8 - {}", err),
+ },
+ Err(err) => bail!("executing 'systemctl is-active' failed - {}", err),
+ };
+
+ Ok(text.trim().trim_start().to_string())
+}
+
+async fn wait_service_is_state(service: &str, state: &str) -> Result<(), Error> {
+ tokio::time::sleep(std::time::Duration::new(1, 0)).await;
+ while get_service_state(service).await? != state {
+ tokio::time::sleep(std::time::Duration::new(5, 0)).await;
+ }
+ Ok(())
+}
+
+async fn wait_service_is_not_state(service: &str, state: &str) -> Result<(), Error> {
+ tokio::time::sleep(std::time::Duration::new(1, 0)).await;
+ while get_service_state(service).await? == state {
+ tokio::time::sleep(std::time::Duration::new(5, 0)).await;
+ }
+ Ok(())
+}
+
+#[link(name = "systemd")]
+extern "C" {
+ fn sd_notify(unset_environment: c_int, state: *const c_char) -> c_int;
+}
+
+pub enum SystemdNotify {
+ Ready,
+ Reloading,
+ Stopping,
+ Status(String),
+ MainPid(nix::unistd::Pid),
+}
+
+pub fn systemd_notify(state: SystemdNotify) -> Result<(), Error> {
+ let message = match state {
+ SystemdNotify::Ready => CString::new("READY=1"),
+ SystemdNotify::Reloading => CString::new("RELOADING=1"),
+ SystemdNotify::Stopping => CString::new("STOPPING=1"),
+ SystemdNotify::Status(msg) => CString::new(format!("STATUS={}", msg)),
+ SystemdNotify::MainPid(pid) => CString::new(format!("MAINPID={}", pid)),
+ }?;
+ let rc = unsafe { sd_notify(0, message.as_ptr()) };
+ if rc < 0 {
+ bail!(
+ "systemd_notify failed: {}",
+ std::io::Error::from_raw_os_error(-rc),
+ );
+ }
+ Ok(())
+}
-use anyhow::{bail, Error};
+use std::os::unix::io::RawFd;
+
+use anyhow::{bail, format_err, Error};
+
+use proxmox::tools::fd::Fd;
+
+pub mod daemon;
mod state;
pub use state::*;
Ok(())
}
+/// Helper to set/clear the FD_CLOEXEC flag on file descriptors
+pub fn fd_change_cloexec(fd: RawFd, on: bool) -> Result<(), Error> {
+ use nix::fcntl::{fcntl, FdFlag, F_GETFD, F_SETFD};
+ let mut flags = FdFlag::from_bits(fcntl(fd, F_GETFD)?)
+ .ok_or_else(|| format_err!("unhandled file flags"))?; // nix crate is stupid this way...
+ flags.set(FdFlag::FD_CLOEXEC, on);
+ fcntl(fd, F_SETFD(flags))?;
+ Ok(())
+}
+
+/// safe wrapper for `nix::sys::socket::socketpair` defaulting to `O_CLOEXEC` and guarding the file
+/// descriptors.
+pub fn socketpair() -> Result<(Fd, Fd), Error> {
+ use nix::sys::socket;
+ let (pa, pb) = socket::socketpair(
+ socket::AddressFamily::Unix,
+ socket::SockType::Stream,
+ None,
+ socket::SockFlag::SOCK_CLOEXEC,
+ )?;
+ Ok((Fd(pa), Fd(pb)))
+}
+
move |worker| async move {
// move inside the worker so that it survives and does not close the port
// remove CLOEXEC from listenere so that we can reuse it in termproxy
- tools::fd_change_cloexec(listener.as_raw_fd(), false)?;
+ proxmox_rest_server::fd_change_cloexec(listener.as_raw_fd(), false)?;
let mut arguments: Vec<&str> = Vec::new();
let fd_string = listener.as_raw_fd().to_string();
auth::default_api_auth,
rest::*,
};
-use proxmox_backup::tools::daemon;
+use proxmox_rest_server::daemon;
+
use proxmox_backup::auth_helpers::*;
use proxmox_backup::config;
PruneOptions,
};
+use proxmox_rest_server::daemon;
+
use proxmox_backup::server;
use proxmox_backup::auth_helpers::*;
use proxmox_backup::tools::{
PROXMOX_BACKUP_TCP_KEEPALIVE_TIME,
- daemon,
disks::{
DiskManage,
zfs_pool_stats,
+++ /dev/null
-//! Helpers for daemons/services.
-
-use std::ffi::CString;
-use std::future::Future;
-use std::io::{Read, Write};
-use std::os::raw::{c_char, c_uchar, c_int};
-use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
-use std::os::unix::ffi::OsStrExt;
-use std::panic::UnwindSafe;
-use std::pin::Pin;
-use std::task::{Context, Poll};
-use std::path::PathBuf;
-
-use anyhow::{bail, format_err, Error};
-use futures::future::{self, Either};
-
-use proxmox::tools::io::{ReadExt, WriteExt};
-
-use crate::tools::{fd_change_cloexec, self};
-
-#[link(name = "systemd")]
-extern "C" {
- fn sd_journal_stream_fd(identifier: *const c_uchar, priority: c_int, level_prefix: c_int) -> c_int;
-}
-
-// Unfortunately FnBox is nightly-only and Box<FnOnce> is unusable, so just use Box<Fn>...
-pub type BoxedStoreFunc = Box<dyn FnMut() -> Result<String, Error> + UnwindSafe + Send>;
-
-/// Helper trait to "store" something in the environment to be re-used after re-executing the
-/// service on a reload.
-pub trait Reloadable: Sized {
- fn restore(var: &str) -> Result<Self, Error>;
- fn get_store_func(&self) -> Result<BoxedStoreFunc, Error>;
-}
-
-/// Manages things to be stored and reloaded upon reexec.
-/// Anything which should be restorable should be instantiated via this struct's `restore` method,
-#[derive(Default)]
-pub struct Reloader {
- pre_exec: Vec<PreExecEntry>,
- self_exe: PathBuf,
-}
-
-// Currently we only need environment variables for storage, but in theory we could also add
-// variants which need temporary files or pipes...
-struct PreExecEntry {
- name: &'static str, // Feel free to change to String if necessary...
- store_fn: BoxedStoreFunc,
-}
-
-impl Reloader {
- pub fn new() -> Result<Self, Error> {
- Ok(Self {
- pre_exec: Vec::new(),
-
- // Get the path to our executable as PathBuf
- self_exe: std::fs::read_link("/proc/self/exe")?,
- })
- }
-
- /// Restore an object from an environment variable of the given name, or, if none exists, uses
- /// the function provided in the `or_create` parameter to instantiate the new "first" instance.
- ///
- /// Values created via this method will be remembered for later re-execution.
- pub async fn restore<T, F, U>(&mut self, name: &'static str, or_create: F) -> Result<T, Error>
- where
- T: Reloadable,
- F: FnOnce() -> U,
- U: Future<Output = Result<T, Error>>,
- {
- let res = match std::env::var(name) {
- Ok(varstr) => T::restore(&varstr)?,
- Err(std::env::VarError::NotPresent) => or_create().await?,
- Err(_) => bail!("variable {} has invalid value", name),
- };
-
- self.pre_exec.push(PreExecEntry {
- name,
- store_fn: res.get_store_func()?,
- });
- Ok(res)
- }
-
- fn pre_exec(self) -> Result<(), Error> {
- for mut item in self.pre_exec {
- std::env::set_var(item.name, (item.store_fn)()?);
- }
- Ok(())
- }
-
- pub fn fork_restart(self) -> Result<(), Error> {
- // Get our parameters as Vec<CString>
- let args = std::env::args_os();
- let mut new_args = Vec::with_capacity(args.len());
- for arg in args {
- new_args.push(CString::new(arg.as_bytes())?);
- }
-
- // Synchronisation pipe:
- let (pold, pnew) = super::socketpair()?;
-
- // Start ourselves in the background:
- use nix::unistd::{fork, ForkResult};
- match unsafe { fork() } {
- Ok(ForkResult::Child) => {
- // Double fork so systemd can supervise us without nagging...
- match unsafe { fork() } {
- Ok(ForkResult::Child) => {
- std::mem::drop(pold);
- // At this point we call pre-exec helpers. We must be certain that if they fail for
- // whatever reason we can still call `_exit()`, so use catch_unwind.
- match std::panic::catch_unwind(move || {
- let mut pnew = unsafe {
- std::fs::File::from_raw_fd(pnew.into_raw_fd())
- };
- let pid = nix::unistd::Pid::this();
- if let Err(e) = unsafe { pnew.write_host_value(pid.as_raw()) } {
- log::error!("failed to send new server PID to parent: {}", e);
- unsafe {
- libc::_exit(-1);
- }
- }
-
- let mut ok = [0u8];
- if let Err(e) = pnew.read_exact(&mut ok) {
- log::error!("parent vanished before notifying systemd: {}", e);
- unsafe {
- libc::_exit(-1);
- }
- }
- assert_eq!(ok[0], 1, "reload handshake should have sent a 1 byte");
-
- std::mem::drop(pnew);
-
- // Try to reopen STDOUT/STDERR journald streams to get correct PID in logs
- let ident = CString::new(self.self_exe.file_name().unwrap().as_bytes()).unwrap();
- let ident = ident.as_bytes();
- let fd = unsafe { sd_journal_stream_fd(ident.as_ptr(), libc::LOG_INFO, 1) };
- if fd >= 0 && fd != 1 {
- let fd = proxmox::tools::fd::Fd(fd); // add drop handler
- nix::unistd::dup2(fd.as_raw_fd(), 1)?;
- } else {
- log::error!("failed to update STDOUT journal redirection ({})", fd);
- }
- let fd = unsafe { sd_journal_stream_fd(ident.as_ptr(), libc::LOG_ERR, 1) };
- if fd >= 0 && fd != 2 {
- let fd = proxmox::tools::fd::Fd(fd); // add drop handler
- nix::unistd::dup2(fd.as_raw_fd(), 2)?;
- } else {
- log::error!("failed to update STDERR journal redirection ({})", fd);
- }
-
- self.do_reexec(new_args)
- })
- {
- Ok(Ok(())) => eprintln!("do_reexec returned!"),
- Ok(Err(err)) => eprintln!("do_reexec failed: {}", err),
- Err(_) => eprintln!("panic in re-exec"),
- }
- }
- Ok(ForkResult::Parent { child }) => {
- std::mem::drop((pold, pnew));
- log::debug!("forked off a new server (second pid: {})", child);
- }
- Err(e) => log::error!("fork() failed, restart delayed: {}", e),
- }
- // No matter how we managed to get here, this is the time where we bail out quickly:
- unsafe {
- libc::_exit(-1)
- }
- }
- Ok(ForkResult::Parent { child }) => {
- log::debug!("forked off a new server (first pid: {}), waiting for 2nd pid", child);
- std::mem::drop(pnew);
- let mut pold = unsafe {
- std::fs::File::from_raw_fd(pold.into_raw_fd())
- };
- let child = nix::unistd::Pid::from_raw(match unsafe { pold.read_le_value() } {
- Ok(v) => v,
- Err(e) => {
- log::error!("failed to receive pid of double-forked child process: {}", e);
- // systemd will complain but won't kill the service...
- return Ok(());
- }
- });
-
- if let Err(e) = systemd_notify(SystemdNotify::MainPid(child)) {
- log::error!("failed to notify systemd about the new main pid: {}", e);
- }
-
- // notify child that it is now the new main process:
- if let Err(e) = pold.write_all(&[1u8]) {
- log::error!("child vanished during reload: {}", e);
- }
-
- Ok(())
- }
- Err(e) => {
- log::error!("fork() failed, restart delayed: {}", e);
- Ok(())
- }
- }
- }
-
- fn do_reexec(self, args: Vec<CString>) -> Result<(), Error> {
- let exe = CString::new(self.self_exe.as_os_str().as_bytes())?;
- self.pre_exec()?;
- nix::unistd::setsid()?;
- let args: Vec<&std::ffi::CStr> = args.iter().map(|s| s.as_ref()).collect();
- nix::unistd::execvp(&exe, &args)?;
- panic!("exec misbehaved");
- }
-}
-
-// For now all we need to do is store and reuse a tcp listening socket:
-impl Reloadable for tokio::net::TcpListener {
- // NOTE: The socket must not be closed when the store-function is called:
- // FIXME: We could become "independent" of the TcpListener and its reference to the file
- // descriptor by `dup()`ing it (and check if the listener still exists via kcmp()?)
- fn get_store_func(&self) -> Result<BoxedStoreFunc, Error> {
- let mut fd_opt = Some(tools::Fd(
- nix::fcntl::fcntl(self.as_raw_fd(), nix::fcntl::FcntlArg::F_DUPFD_CLOEXEC(0))?
- ));
- Ok(Box::new(move || {
- let fd = fd_opt.take().unwrap();
- fd_change_cloexec(fd.as_raw_fd(), false)?;
- Ok(fd.into_raw_fd().to_string())
- }))
- }
-
- fn restore(var: &str) -> Result<Self, Error> {
- let fd = var.parse::<u32>()
- .map_err(|e| format_err!("invalid file descriptor: {}", e))?
- as RawFd;
- fd_change_cloexec(fd, true)?;
- Ok(Self::from_std(
- unsafe { std::net::TcpListener::from_raw_fd(fd) },
- )?)
- }
-}
-
-pub struct NotifyReady;
-
-impl Future for NotifyReady {
- type Output = Result<(), Error>;
-
- fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), Error>> {
- systemd_notify(SystemdNotify::Ready)?;
- Poll::Ready(Ok(()))
- }
-}
-
-/// This creates a future representing a daemon which reloads itself when receiving a SIGHUP.
-/// If this is started regularly, a listening socket is created. In this case, the file descriptor
-/// number will be remembered in `PROXMOX_BACKUP_LISTEN_FD`.
-/// If the variable already exists, its contents will instead be used to restore the listening
-/// socket. The finished listening socket is then passed to the `create_service` function which
-/// can be used to setup the TLS and the HTTP daemon.
-pub async fn create_daemon<F, S>(
- address: std::net::SocketAddr,
- create_service: F,
- service_name: &str,
-) -> Result<(), Error>
-where
- F: FnOnce(tokio::net::TcpListener, NotifyReady) -> Result<S, Error>,
- S: Future<Output = ()> + Unpin,
-{
- let mut reloader = Reloader::new()?;
-
- let listener: tokio::net::TcpListener = reloader.restore(
- "PROXMOX_BACKUP_LISTEN_FD",
- move || async move { Ok(tokio::net::TcpListener::bind(&address).await?) },
- ).await?;
-
- let server_future = create_service(listener, NotifyReady)?;
- let shutdown_future = proxmox_rest_server::shutdown_future();
-
- let finish_future = match future::select(server_future, shutdown_future).await {
- Either::Left((_, _)) => {
- proxmox_rest_server::request_shutdown(); // make sure we are in shutdown mode
- None
- }
- Either::Right((_, server_future)) => Some(server_future),
- };
-
- let mut reloader = Some(reloader);
-
- if proxmox_rest_server::is_reload_request() {
- log::info!("daemon reload...");
- if let Err(e) = systemd_notify(SystemdNotify::Reloading) {
- log::error!("failed to notify systemd about the state change: {}", e);
- }
- wait_service_is_state(service_name, "reloading").await?;
- if let Err(e) = reloader.take().unwrap().fork_restart() {
- log::error!("error during reload: {}", e);
- let _ = systemd_notify(SystemdNotify::Status("error during reload".to_string()));
- }
- } else {
- log::info!("daemon shutting down...");
- }
-
- if let Some(future) = finish_future {
- future.await;
- }
-
- // FIXME: this is a hack, replace with sd_notify_barrier when available
- if proxmox_rest_server::is_reload_request() {
- wait_service_is_not_state(service_name, "reloading").await?;
- }
-
- log::info!("daemon shut down...");
- Ok(())
-}
-
-// hack, do not use if unsure!
-async fn get_service_state(service: &str) -> Result<String, Error> {
- let text = match tokio::process::Command::new("systemctl")
- .args(&["is-active", service])
- .output()
- .await
- {
- Ok(output) => match String::from_utf8(output.stdout) {
- Ok(text) => text,
- Err(err) => bail!("output of 'systemctl is-active' not valid UTF-8 - {}", err),
- },
- Err(err) => bail!("executing 'systemctl is-active' failed - {}", err),
- };
-
- Ok(text.trim().trim_start().to_string())
-}
-
-async fn wait_service_is_state(service: &str, state: &str) -> Result<(), Error> {
- tokio::time::sleep(std::time::Duration::new(1, 0)).await;
- while get_service_state(service).await? != state {
- tokio::time::sleep(std::time::Duration::new(5, 0)).await;
- }
- Ok(())
-}
-
-async fn wait_service_is_not_state(service: &str, state: &str) -> Result<(), Error> {
- tokio::time::sleep(std::time::Duration::new(1, 0)).await;
- while get_service_state(service).await? == state {
- tokio::time::sleep(std::time::Duration::new(5, 0)).await;
- }
- Ok(())
-}
-
-#[link(name = "systemd")]
-extern "C" {
- fn sd_notify(unset_environment: c_int, state: *const c_char) -> c_int;
-}
-
-pub enum SystemdNotify {
- Ready,
- Reloading,
- Stopping,
- Status(String),
- MainPid(nix::unistd::Pid),
-}
-
-pub fn systemd_notify(state: SystemdNotify) -> Result<(), Error> {
- let message = match state {
- SystemdNotify::Ready => CString::new("READY=1"),
- SystemdNotify::Reloading => CString::new("RELOADING=1"),
- SystemdNotify::Stopping => CString::new("STOPPING=1"),
- SystemdNotify::Status(msg) => CString::new(format!("STATUS={}", msg)),
- SystemdNotify::MainPid(pid) => CString::new(format!("MAINPID={}", pid)),
- }?;
- let rc = unsafe { sd_notify(0, message.as_ptr()) };
- if rc < 0 {
- bail!(
- "systemd_notify failed: {}",
- std::io::Error::from_raw_os_error(-rc),
- );
- }
- Ok(())
-}
//!
//! This is a collection of small and useful tools.
use std::any::Any;
-use std::os::unix::io::RawFd;
use anyhow::{bail, format_err, Error};
use openssl::hash::{hash, DigestBytes, MessageDigest};
-pub use proxmox::tools::fd::Fd;
-
use proxmox_http::{
client::SimpleHttp,
client::SimpleHttpOptions,
pub mod async_io;
pub mod compression;
pub mod config;
-pub mod daemon;
pub mod disks;
pub mod serde_filter;
Ok((path, components))
}
-pub fn fd_change_cloexec(fd: RawFd, on: bool) -> Result<(), Error> {
- use nix::fcntl::{fcntl, FdFlag, F_GETFD, F_SETFD};
- let mut flags = FdFlag::from_bits(fcntl(fd, F_GETFD)?)
- .ok_or_else(|| format_err!("unhandled file flags"))?; // nix crate is stupid this way...
- flags.set(FdFlag::FD_CLOEXEC, on);
- fcntl(fd, F_SETFD(flags))?;
- Ok(())
-}
-
-/// safe wrapper for `nix::sys::socket::socketpair` defaulting to `O_CLOEXEC` and guarding the file
-/// descriptors.
-pub fn socketpair() -> Result<(Fd, Fd), Error> {
- use nix::sys::socket;
- let (pa, pb) = socket::socketpair(
- socket::AddressFamily::Unix,
- socket::SockType::Stream,
- None,
- socket::SockFlag::SOCK_CLOEXEC,
- )?;
- Ok((Fd(pa), Fd(pb)))
-}
-
-
/// An easy way to convert types to Any
///
/// Mostly useful to downcast trait objects (see RpcEnvironment).