1 //! Helpers to implement restartable daemons/services.
4 use std
::future
::Future
;
5 use std
::io
::{Read, Write}
;
6 use std
::os
::raw
::{c_char, c_uchar, c_int}
;
7 use std
::os
::unix
::io
::{AsRawFd, FromRawFd, IntoRawFd, RawFd}
;
8 use std
::os
::unix
::ffi
::OsStrExt
;
9 use std
::panic
::UnwindSafe
;
11 use std
::task
::{Context, Poll}
;
12 use std
::path
::PathBuf
;
14 use anyhow
::{bail, format_err, Error}
;
15 use futures
::future
::{self, Either}
;
17 use proxmox
::tools
::io
::{ReadExt, WriteExt}
;
18 use proxmox
::tools
::fd
::Fd
;
20 use crate::fd_change_cloexec
;
22 #[link(name = "systemd")]
24 fn sd_journal_stream_fd(identifier
: *const c_uchar
, priority
: c_int
, level_prefix
: c_int
) -> c_int
;
27 // Unfortunately FnBox is nightly-only and Box<FnOnce> is unusable, so just use Box<Fn>...
28 type BoxedStoreFunc
= Box
<dyn FnMut() -> Result
<String
, Error
> + UnwindSafe
+ Send
>;
30 /// Helper trait to "store" something in the environment to be re-used after re-executing the
31 /// service on a reload.
32 pub trait Reloadable
: Sized
{
33 fn restore(var
: &str) -> Result
<Self, Error
>;
34 fn get_store_func(&self) -> Result
<BoxedStoreFunc
, Error
>;
37 /// Manages things to be stored and reloaded upon reexec.
38 /// Anything which should be restorable should be instantiated via this struct's `restore` method,
41 pre_exec
: Vec
<PreExecEntry
>,
45 // Currently we only need environment variables for storage, but in theory we could also add
46 // variants which need temporary files or pipes...
48 name
: &'
static str, // Feel free to change to String if necessary...
49 store_fn
: BoxedStoreFunc
,
53 pub fn new() -> Result
<Self, Error
> {
57 // Get the path to our executable as PathBuf
58 self_exe
: std
::fs
::read_link("/proc/self/exe")?
,
62 /// Restore an object from an environment variable of the given name, or, if none exists, uses
63 /// the function provided in the `or_create` parameter to instantiate the new "first" instance.
65 /// Values created via this method will be remembered for later re-execution.
66 pub async
fn restore
<T
, F
, U
>(&mut self, name
: &'
static str, or_create
: F
) -> Result
<T
, Error
>
70 U
: Future
<Output
= Result
<T
, Error
>>,
72 let res
= match std
::env
::var(name
) {
73 Ok(varstr
) => T
::restore(&varstr
)?
,
74 Err(std
::env
::VarError
::NotPresent
) => or_create().await?
,
75 Err(_
) => bail
!("variable {} has invalid value", name
),
78 self.pre_exec
.push(PreExecEntry
{
80 store_fn
: res
.get_store_func()?
,
85 fn pre_exec(self) -> Result
<(), Error
> {
86 for mut item
in self.pre_exec
{
87 std
::env
::set_var(item
.name
, (item
.store_fn
)()?
);
92 pub fn fork_restart(self) -> Result
<(), Error
> {
93 // Get our parameters as Vec<CString>
94 let args
= std
::env
::args_os();
95 let mut new_args
= Vec
::with_capacity(args
.len());
97 new_args
.push(CString
::new(arg
.as_bytes())?
);
100 // Synchronisation pipe:
101 let (pold
, pnew
) = super::socketpair()?
;
103 // Start ourselves in the background:
104 use nix
::unistd
::{fork, ForkResult}
;
105 match unsafe { fork() }
{
106 Ok(ForkResult
::Child
) => {
107 // Double fork so systemd can supervise us without nagging...
108 match unsafe { fork() }
{
109 Ok(ForkResult
::Child
) => {
110 std
::mem
::drop(pold
);
111 // At this point we call pre-exec helpers. We must be certain that if they fail for
112 // whatever reason we can still call `_exit()`, so use catch_unwind.
113 match std
::panic
::catch_unwind(move || {
114 let mut pnew
= unsafe {
115 std
::fs
::File
::from_raw_fd(pnew
.into_raw_fd())
117 let pid
= nix
::unistd
::Pid
::this();
118 if let Err(e
) = unsafe { pnew.write_host_value(pid.as_raw()) }
{
119 log
::error
!("failed to send new server PID to parent: {}", e
);
126 if let Err(e
) = pnew
.read_exact(&mut ok
) {
127 log
::error
!("parent vanished before notifying systemd: {}", e
);
132 assert_eq
!(ok
[0], 1, "reload handshake should have sent a 1 byte");
134 std
::mem
::drop(pnew
);
136 // Try to reopen STDOUT/STDERR journald streams to get correct PID in logs
137 let ident
= CString
::new(self.self_exe
.file_name().unwrap().as_bytes()).unwrap();
138 let ident
= ident
.as_bytes();
139 let fd
= unsafe { sd_journal_stream_fd(ident.as_ptr(), libc::LOG_INFO, 1) }
;
140 if fd
>= 0 && fd
!= 1 {
141 let fd
= proxmox
::tools
::fd
::Fd(fd
); // add drop handler
142 nix
::unistd
::dup2(fd
.as_raw_fd(), 1)?
;
144 log
::error
!("failed to update STDOUT journal redirection ({})", fd
);
146 let fd
= unsafe { sd_journal_stream_fd(ident.as_ptr(), libc::LOG_ERR, 1) }
;
147 if fd
>= 0 && fd
!= 2 {
148 let fd
= proxmox
::tools
::fd
::Fd(fd
); // add drop handler
149 nix
::unistd
::dup2(fd
.as_raw_fd(), 2)?
;
151 log
::error
!("failed to update STDERR journal redirection ({})", fd
);
154 self.do_reexec(new_args
)
157 Ok(Ok(())) => eprintln
!("do_reexec returned!"),
158 Ok(Err(err
)) => eprintln
!("do_reexec failed: {}", err
),
159 Err(_
) => eprintln
!("panic in re-exec"),
162 Ok(ForkResult
::Parent { child }
) => {
163 std
::mem
::drop((pold
, pnew
));
164 log
::debug
!("forked off a new server (second pid: {})", child
);
166 Err(e
) => log
::error
!("fork() failed, restart delayed: {}", e
),
168 // No matter how we managed to get here, this is the time where we bail out quickly:
173 Ok(ForkResult
::Parent { child }
) => {
174 log
::debug
!("forked off a new server (first pid: {}), waiting for 2nd pid", child
);
175 std
::mem
::drop(pnew
);
176 let mut pold
= unsafe {
177 std
::fs
::File
::from_raw_fd(pold
.into_raw_fd())
179 let child
= nix
::unistd
::Pid
::from_raw(match unsafe { pold.read_le_value() }
{
182 log
::error
!("failed to receive pid of double-forked child process: {}", e
);
183 // systemd will complain but won't kill the service...
188 if let Err(e
) = systemd_notify(SystemdNotify
::MainPid(child
)) {
189 log
::error
!("failed to notify systemd about the new main pid: {}", e
);
192 // notify child that it is now the new main process:
193 if let Err(e
) = pold
.write_all(&[1u8]) {
194 log
::error
!("child vanished during reload: {}", e
);
200 log
::error
!("fork() failed, restart delayed: {}", e
);
206 fn do_reexec(self, args
: Vec
<CString
>) -> Result
<(), Error
> {
207 let exe
= CString
::new(self.self_exe
.as_os_str().as_bytes())?
;
209 nix
::unistd
::setsid()?
;
210 let args
: Vec
<&std
::ffi
::CStr
> = args
.iter().map(|s
| s
.as_ref()).collect();
211 nix
::unistd
::execvp(&exe
, &args
)?
;
212 panic
!("exec misbehaved");
216 // For now all we need to do is store and reuse a tcp listening socket:
217 impl Reloadable
for tokio
::net
::TcpListener
{
218 // NOTE: The socket must not be closed when the store-function is called:
219 // FIXME: We could become "independent" of the TcpListener and its reference to the file
220 // descriptor by `dup()`ing it (and check if the listener still exists via kcmp()?)
221 fn get_store_func(&self) -> Result
<BoxedStoreFunc
, Error
> {
222 let mut fd_opt
= Some(Fd(
223 nix
::fcntl
::fcntl(self.as_raw_fd(), nix
::fcntl
::FcntlArg
::F_DUPFD_CLOEXEC(0))?
225 Ok(Box
::new(move || {
226 let fd
= fd_opt
.take().unwrap();
227 fd_change_cloexec(fd
.as_raw_fd(), false)?
;
228 Ok(fd
.into_raw_fd().to_string())
232 fn restore(var
: &str) -> Result
<Self, Error
> {
233 let fd
= var
.parse
::<u32>()
234 .map_err(|e
| format_err
!("invalid file descriptor: {}", e
))?
236 fd_change_cloexec(fd
, true)?
;
238 unsafe { std::net::TcpListener::from_raw_fd(fd) }
,
243 pub struct NotifyReady
;
245 impl Future
for NotifyReady
{
246 type Output
= Result
<(), Error
>;
248 fn poll(self: Pin
<&mut Self>, _cx
: &mut Context
) -> Poll
<Result
<(), Error
>> {
249 systemd_notify(SystemdNotify
::Ready
)?
;
254 /// This creates a future representing a daemon which reloads itself when receiving a SIGHUP.
255 /// If this is started regularly, a listening socket is created. In this case, the file descriptor
256 /// number will be remembered in `PROXMOX_BACKUP_LISTEN_FD`.
257 /// If the variable already exists, its contents will instead be used to restore the listening
258 /// socket. The finished listening socket is then passed to the `create_service` function which
259 /// can be used to setup the TLS and the HTTP daemon.
260 pub async
fn create_daemon
<F
, S
>(
261 address
: std
::net
::SocketAddr
,
264 ) -> Result
<(), Error
>
266 F
: FnOnce(tokio
::net
::TcpListener
, NotifyReady
) -> Result
<S
, Error
>,
267 S
: Future
<Output
= ()> + Unpin
,
269 let mut reloader
= Reloader
::new()?
;
271 let listener
: tokio
::net
::TcpListener
= reloader
.restore(
272 "PROXMOX_BACKUP_LISTEN_FD",
273 move || async
move { Ok(tokio::net::TcpListener::bind(&address).await?) }
,
276 let server_future
= create_service(listener
, NotifyReady
)?
;
277 let shutdown_future
= crate::shutdown_future();
279 let finish_future
= match future
::select(server_future
, shutdown_future
).await
{
280 Either
::Left((_
, _
)) => {
281 crate::request_shutdown(); // make sure we are in shutdown mode
284 Either
::Right((_
, server_future
)) => Some(server_future
),
287 let mut reloader
= Some(reloader
);
289 if crate::is_reload_request() {
290 log
::info
!("daemon reload...");
291 if let Err(e
) = systemd_notify(SystemdNotify
::Reloading
) {
292 log
::error
!("failed to notify systemd about the state change: {}", e
);
294 wait_service_is_state(service_name
, "reloading").await?
;
295 if let Err(e
) = reloader
.take().unwrap().fork_restart() {
296 log
::error
!("error during reload: {}", e
);
297 let _
= systemd_notify(SystemdNotify
::Status("error during reload".to_string()));
300 log
::info
!("daemon shutting down...");
303 if let Some(future
) = finish_future
{
307 // FIXME: this is a hack, replace with sd_notify_barrier when available
308 if crate::is_reload_request() {
309 wait_service_is_not_state(service_name
, "reloading").await?
;
312 log
::info
!("daemon shut down...");
316 // hack, do not use if unsure!
317 async
fn get_service_state(service
: &str) -> Result
<String
, Error
> {
318 let text
= match tokio
::process
::Command
::new("systemctl")
319 .args(&["is-active", service
])
323 Ok(output
) => match String
::from_utf8(output
.stdout
) {
325 Err(err
) => bail
!("output of 'systemctl is-active' not valid UTF-8 - {}", err
),
327 Err(err
) => bail
!("executing 'systemctl is-active' failed - {}", err
),
330 Ok(text
.trim().trim_start().to_string())
333 async
fn wait_service_is_state(service
: &str, state
: &str) -> Result
<(), Error
> {
334 tokio
::time
::sleep(std
::time
::Duration
::new(1, 0)).await
;
335 while get_service_state(service
).await?
!= state
{
336 tokio
::time
::sleep(std
::time
::Duration
::new(5, 0)).await
;
341 async
fn wait_service_is_not_state(service
: &str, state
: &str) -> Result
<(), Error
> {
342 tokio
::time
::sleep(std
::time
::Duration
::new(1, 0)).await
;
343 while get_service_state(service
).await?
== state
{
344 tokio
::time
::sleep(std
::time
::Duration
::new(5, 0)).await
;
349 #[link(name = "systemd")]
351 fn sd_notify(unset_environment
: c_int
, state
: *const c_char
) -> c_int
;
354 /// Systemd sercice startup states (see: ``man sd_notify``)
355 pub enum SystemdNotify
{
360 MainPid(nix
::unistd
::Pid
),
363 /// Tells systemd the startup state of the service (see: ``man sd_notify``)
364 pub fn systemd_notify(state
: SystemdNotify
) -> Result
<(), Error
> {
365 let message
= match state
{
366 SystemdNotify
::Ready
=> CString
::new("READY=1"),
367 SystemdNotify
::Reloading
=> CString
::new("RELOADING=1"),
368 SystemdNotify
::Stopping
=> CString
::new("STOPPING=1"),
369 SystemdNotify
::Status(msg
) => CString
::new(format
!("STATUS={}", msg
)),
370 SystemdNotify
::MainPid(pid
) => CString
::new(format
!("MAINPID={}", pid
)),
372 let rc
= unsafe { sd_notify(0, message.as_ptr()) }
;
375 "systemd_notify failed: {}",
376 std
::io
::Error
::from_raw_os_error(-rc
),