]> git.proxmox.com Git - proxmox-backup.git/blob - proxmox-rest-server/src/daemon.rs
c527a586c416766b09c904ab602bdb7bfa6d1a1e
[proxmox-backup.git] / proxmox-rest-server / src / daemon.rs
1 //! Helpers to implement restartable daemons/services.
2
3 use std::ffi::CString;
4 use std::future::Future;
5 use std::io::{Read, Write};
6 use std::os::raw::{c_char, c_uchar, c_int};
7 use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
8 use std::os::unix::ffi::OsStrExt;
9 use std::panic::UnwindSafe;
10 use std::pin::Pin;
11 use std::task::{Context, Poll};
12 use std::path::PathBuf;
13
14 use anyhow::{bail, format_err, Error};
15 use futures::future::{self, Either};
16
17 use proxmox::tools::io::{ReadExt, WriteExt};
18 use proxmox::tools::fd::Fd;
19
20 use crate::fd_change_cloexec;
21
22 #[link(name = "systemd")]
23 extern "C" {
24 fn sd_journal_stream_fd(identifier: *const c_uchar, priority: c_int, level_prefix: c_int) -> c_int;
25 }
26
27 // Unfortunately FnBox is nightly-only and Box<FnOnce> is unusable, so just use Box<Fn>...
28 type BoxedStoreFunc = Box<dyn FnMut() -> Result<String, Error> + UnwindSafe + Send>;
29
30 /// Helper trait to "store" something in the environment to be re-used after re-executing the
31 /// service on a reload.
32 pub trait Reloadable: Sized {
33 fn restore(var: &str) -> Result<Self, Error>;
34 fn get_store_func(&self) -> Result<BoxedStoreFunc, Error>;
35 }
36
37 /// Manages things to be stored and reloaded upon reexec.
38 /// Anything which should be restorable should be instantiated via this struct's `restore` method,
39 #[derive(Default)]
40 pub struct Reloader {
41 pre_exec: Vec<PreExecEntry>,
42 self_exe: PathBuf,
43 }
44
45 // Currently we only need environment variables for storage, but in theory we could also add
46 // variants which need temporary files or pipes...
47 struct PreExecEntry {
48 name: &'static str, // Feel free to change to String if necessary...
49 store_fn: BoxedStoreFunc,
50 }
51
52 impl Reloader {
53 pub fn new() -> Result<Self, Error> {
54 Ok(Self {
55 pre_exec: Vec::new(),
56
57 // Get the path to our executable as PathBuf
58 self_exe: std::fs::read_link("/proc/self/exe")?,
59 })
60 }
61
62 /// Restore an object from an environment variable of the given name, or, if none exists, uses
63 /// the function provided in the `or_create` parameter to instantiate the new "first" instance.
64 ///
65 /// Values created via this method will be remembered for later re-execution.
66 pub async fn restore<T, F, U>(&mut self, name: &'static str, or_create: F) -> Result<T, Error>
67 where
68 T: Reloadable,
69 F: FnOnce() -> U,
70 U: Future<Output = Result<T, Error>>,
71 {
72 let res = match std::env::var(name) {
73 Ok(varstr) => T::restore(&varstr)?,
74 Err(std::env::VarError::NotPresent) => or_create().await?,
75 Err(_) => bail!("variable {} has invalid value", name),
76 };
77
78 self.pre_exec.push(PreExecEntry {
79 name,
80 store_fn: res.get_store_func()?,
81 });
82 Ok(res)
83 }
84
85 fn pre_exec(self) -> Result<(), Error> {
86 for mut item in self.pre_exec {
87 std::env::set_var(item.name, (item.store_fn)()?);
88 }
89 Ok(())
90 }
91
92 pub fn fork_restart(self) -> Result<(), Error> {
93 // Get our parameters as Vec<CString>
94 let args = std::env::args_os();
95 let mut new_args = Vec::with_capacity(args.len());
96 for arg in args {
97 new_args.push(CString::new(arg.as_bytes())?);
98 }
99
100 // Synchronisation pipe:
101 let (pold, pnew) = super::socketpair()?;
102
103 // Start ourselves in the background:
104 use nix::unistd::{fork, ForkResult};
105 match unsafe { fork() } {
106 Ok(ForkResult::Child) => {
107 // Double fork so systemd can supervise us without nagging...
108 match unsafe { fork() } {
109 Ok(ForkResult::Child) => {
110 std::mem::drop(pold);
111 // At this point we call pre-exec helpers. We must be certain that if they fail for
112 // whatever reason we can still call `_exit()`, so use catch_unwind.
113 match std::panic::catch_unwind(move || {
114 let mut pnew = unsafe {
115 std::fs::File::from_raw_fd(pnew.into_raw_fd())
116 };
117 let pid = nix::unistd::Pid::this();
118 if let Err(e) = unsafe { pnew.write_host_value(pid.as_raw()) } {
119 log::error!("failed to send new server PID to parent: {}", e);
120 unsafe {
121 libc::_exit(-1);
122 }
123 }
124
125 let mut ok = [0u8];
126 if let Err(e) = pnew.read_exact(&mut ok) {
127 log::error!("parent vanished before notifying systemd: {}", e);
128 unsafe {
129 libc::_exit(-1);
130 }
131 }
132 assert_eq!(ok[0], 1, "reload handshake should have sent a 1 byte");
133
134 std::mem::drop(pnew);
135
136 // Try to reopen STDOUT/STDERR journald streams to get correct PID in logs
137 let ident = CString::new(self.self_exe.file_name().unwrap().as_bytes()).unwrap();
138 let ident = ident.as_bytes();
139 let fd = unsafe { sd_journal_stream_fd(ident.as_ptr(), libc::LOG_INFO, 1) };
140 if fd >= 0 && fd != 1 {
141 let fd = proxmox::tools::fd::Fd(fd); // add drop handler
142 nix::unistd::dup2(fd.as_raw_fd(), 1)?;
143 } else {
144 log::error!("failed to update STDOUT journal redirection ({})", fd);
145 }
146 let fd = unsafe { sd_journal_stream_fd(ident.as_ptr(), libc::LOG_ERR, 1) };
147 if fd >= 0 && fd != 2 {
148 let fd = proxmox::tools::fd::Fd(fd); // add drop handler
149 nix::unistd::dup2(fd.as_raw_fd(), 2)?;
150 } else {
151 log::error!("failed to update STDERR journal redirection ({})", fd);
152 }
153
154 self.do_reexec(new_args)
155 })
156 {
157 Ok(Ok(())) => eprintln!("do_reexec returned!"),
158 Ok(Err(err)) => eprintln!("do_reexec failed: {}", err),
159 Err(_) => eprintln!("panic in re-exec"),
160 }
161 }
162 Ok(ForkResult::Parent { child }) => {
163 std::mem::drop((pold, pnew));
164 log::debug!("forked off a new server (second pid: {})", child);
165 }
166 Err(e) => log::error!("fork() failed, restart delayed: {}", e),
167 }
168 // No matter how we managed to get here, this is the time where we bail out quickly:
169 unsafe {
170 libc::_exit(-1)
171 }
172 }
173 Ok(ForkResult::Parent { child }) => {
174 log::debug!("forked off a new server (first pid: {}), waiting for 2nd pid", child);
175 std::mem::drop(pnew);
176 let mut pold = unsafe {
177 std::fs::File::from_raw_fd(pold.into_raw_fd())
178 };
179 let child = nix::unistd::Pid::from_raw(match unsafe { pold.read_le_value() } {
180 Ok(v) => v,
181 Err(e) => {
182 log::error!("failed to receive pid of double-forked child process: {}", e);
183 // systemd will complain but won't kill the service...
184 return Ok(());
185 }
186 });
187
188 if let Err(e) = systemd_notify(SystemdNotify::MainPid(child)) {
189 log::error!("failed to notify systemd about the new main pid: {}", e);
190 }
191
192 // notify child that it is now the new main process:
193 if let Err(e) = pold.write_all(&[1u8]) {
194 log::error!("child vanished during reload: {}", e);
195 }
196
197 Ok(())
198 }
199 Err(e) => {
200 log::error!("fork() failed, restart delayed: {}", e);
201 Ok(())
202 }
203 }
204 }
205
206 fn do_reexec(self, args: Vec<CString>) -> Result<(), Error> {
207 let exe = CString::new(self.self_exe.as_os_str().as_bytes())?;
208 self.pre_exec()?;
209 nix::unistd::setsid()?;
210 let args: Vec<&std::ffi::CStr> = args.iter().map(|s| s.as_ref()).collect();
211 nix::unistd::execvp(&exe, &args)?;
212 panic!("exec misbehaved");
213 }
214 }
215
216 // For now all we need to do is store and reuse a tcp listening socket:
217 impl Reloadable for tokio::net::TcpListener {
218 // NOTE: The socket must not be closed when the store-function is called:
219 // FIXME: We could become "independent" of the TcpListener and its reference to the file
220 // descriptor by `dup()`ing it (and check if the listener still exists via kcmp()?)
221 fn get_store_func(&self) -> Result<BoxedStoreFunc, Error> {
222 let mut fd_opt = Some(Fd(
223 nix::fcntl::fcntl(self.as_raw_fd(), nix::fcntl::FcntlArg::F_DUPFD_CLOEXEC(0))?
224 ));
225 Ok(Box::new(move || {
226 let fd = fd_opt.take().unwrap();
227 fd_change_cloexec(fd.as_raw_fd(), false)?;
228 Ok(fd.into_raw_fd().to_string())
229 }))
230 }
231
232 fn restore(var: &str) -> Result<Self, Error> {
233 let fd = var.parse::<u32>()
234 .map_err(|e| format_err!("invalid file descriptor: {}", e))?
235 as RawFd;
236 fd_change_cloexec(fd, true)?;
237 Ok(Self::from_std(
238 unsafe { std::net::TcpListener::from_raw_fd(fd) },
239 )?)
240 }
241 }
242
243 pub struct NotifyReady;
244
245 impl Future for NotifyReady {
246 type Output = Result<(), Error>;
247
248 fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), Error>> {
249 systemd_notify(SystemdNotify::Ready)?;
250 Poll::Ready(Ok(()))
251 }
252 }
253
254 /// This creates a future representing a daemon which reloads itself when receiving a SIGHUP.
255 /// If this is started regularly, a listening socket is created. In this case, the file descriptor
256 /// number will be remembered in `PROXMOX_BACKUP_LISTEN_FD`.
257 /// If the variable already exists, its contents will instead be used to restore the listening
258 /// socket. The finished listening socket is then passed to the `create_service` function which
259 /// can be used to setup the TLS and the HTTP daemon.
260 pub async fn create_daemon<F, S>(
261 address: std::net::SocketAddr,
262 create_service: F,
263 service_name: &str,
264 ) -> Result<(), Error>
265 where
266 F: FnOnce(tokio::net::TcpListener, NotifyReady) -> Result<S, Error>,
267 S: Future<Output = ()> + Unpin,
268 {
269 let mut reloader = Reloader::new()?;
270
271 let listener: tokio::net::TcpListener = reloader.restore(
272 "PROXMOX_BACKUP_LISTEN_FD",
273 move || async move { Ok(tokio::net::TcpListener::bind(&address).await?) },
274 ).await?;
275
276 let server_future = create_service(listener, NotifyReady)?;
277 let shutdown_future = crate::shutdown_future();
278
279 let finish_future = match future::select(server_future, shutdown_future).await {
280 Either::Left((_, _)) => {
281 crate::request_shutdown(); // make sure we are in shutdown mode
282 None
283 }
284 Either::Right((_, server_future)) => Some(server_future),
285 };
286
287 let mut reloader = Some(reloader);
288
289 if crate::is_reload_request() {
290 log::info!("daemon reload...");
291 if let Err(e) = systemd_notify(SystemdNotify::Reloading) {
292 log::error!("failed to notify systemd about the state change: {}", e);
293 }
294 wait_service_is_state(service_name, "reloading").await?;
295 if let Err(e) = reloader.take().unwrap().fork_restart() {
296 log::error!("error during reload: {}", e);
297 let _ = systemd_notify(SystemdNotify::Status("error during reload".to_string()));
298 }
299 } else {
300 log::info!("daemon shutting down...");
301 }
302
303 if let Some(future) = finish_future {
304 future.await;
305 }
306
307 // FIXME: this is a hack, replace with sd_notify_barrier when available
308 if crate::is_reload_request() {
309 wait_service_is_not_state(service_name, "reloading").await?;
310 }
311
312 log::info!("daemon shut down...");
313 Ok(())
314 }
315
316 // hack, do not use if unsure!
317 async fn get_service_state(service: &str) -> Result<String, Error> {
318 let text = match tokio::process::Command::new("systemctl")
319 .args(&["is-active", service])
320 .output()
321 .await
322 {
323 Ok(output) => match String::from_utf8(output.stdout) {
324 Ok(text) => text,
325 Err(err) => bail!("output of 'systemctl is-active' not valid UTF-8 - {}", err),
326 },
327 Err(err) => bail!("executing 'systemctl is-active' failed - {}", err),
328 };
329
330 Ok(text.trim().trim_start().to_string())
331 }
332
333 async fn wait_service_is_state(service: &str, state: &str) -> Result<(), Error> {
334 tokio::time::sleep(std::time::Duration::new(1, 0)).await;
335 while get_service_state(service).await? != state {
336 tokio::time::sleep(std::time::Duration::new(5, 0)).await;
337 }
338 Ok(())
339 }
340
341 async fn wait_service_is_not_state(service: &str, state: &str) -> Result<(), Error> {
342 tokio::time::sleep(std::time::Duration::new(1, 0)).await;
343 while get_service_state(service).await? == state {
344 tokio::time::sleep(std::time::Duration::new(5, 0)).await;
345 }
346 Ok(())
347 }
348
349 #[link(name = "systemd")]
350 extern "C" {
351 fn sd_notify(unset_environment: c_int, state: *const c_char) -> c_int;
352 }
353
354 /// Systemd sercice startup states (see: ``man sd_notify``)
355 pub enum SystemdNotify {
356 Ready,
357 Reloading,
358 Stopping,
359 Status(String),
360 MainPid(nix::unistd::Pid),
361 }
362
363 /// Tells systemd the startup state of the service (see: ``man sd_notify``)
364 pub fn systemd_notify(state: SystemdNotify) -> Result<(), Error> {
365 let message = match state {
366 SystemdNotify::Ready => CString::new("READY=1"),
367 SystemdNotify::Reloading => CString::new("RELOADING=1"),
368 SystemdNotify::Stopping => CString::new("STOPPING=1"),
369 SystemdNotify::Status(msg) => CString::new(format!("STATUS={}", msg)),
370 SystemdNotify::MainPid(pid) => CString::new(format!("MAINPID={}", pid)),
371 }?;
372 let rc = unsafe { sd_notify(0, message.as_ptr()) };
373 if rc < 0 {
374 bail!(
375 "systemd_notify failed: {}",
376 std::io::Error::from_raw_os_error(-rc),
377 );
378 }
379 Ok(())
380 }