]> git.proxmox.com Git - proxmox-backup.git/blob - proxmox-rest-server/src/daemon.rs
proxmox-rest-server: avoid useless call to request_shutdown
[proxmox-backup.git] / proxmox-rest-server / src / daemon.rs
1 //! Helpers to implement restartable daemons/services.
2
3 use std::ffi::CString;
4 use std::future::Future;
5 use std::io::{Read, Write};
6 use std::os::raw::{c_char, c_uchar, c_int};
7 use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
8 use std::os::unix::ffi::OsStrExt;
9 use std::panic::UnwindSafe;
10 use std::path::PathBuf;
11
12 use anyhow::{bail, format_err, Error};
13 use futures::future::{self, Either};
14
15 use proxmox::tools::io::{ReadExt, WriteExt};
16 use proxmox::tools::fd::Fd;
17
18 use crate::fd_change_cloexec;
19
20 #[link(name = "systemd")]
21 extern "C" {
22 fn sd_journal_stream_fd(identifier: *const c_uchar, priority: c_int, level_prefix: c_int) -> c_int;
23 }
24
25 // Unfortunately FnBox is nightly-only and Box<FnOnce> is unusable, so just use Box<Fn>...
26 type BoxedStoreFunc = Box<dyn FnMut() -> Result<String, Error> + UnwindSafe + Send>;
27
28 /// Helper trait to "store" something in the environment to be re-used after re-executing the
29 /// service on a reload.
30 pub trait Reloadable: Sized {
31 fn restore(var: &str) -> Result<Self, Error>;
32 fn get_store_func(&self) -> Result<BoxedStoreFunc, Error>;
33 }
34
35 /// Manages things to be stored and reloaded upon reexec.
36 /// Anything which should be restorable should be instantiated via this struct's `restore` method,
37 #[derive(Default)]
38 pub struct Reloader {
39 pre_exec: Vec<PreExecEntry>,
40 self_exe: PathBuf,
41 }
42
43 // Currently we only need environment variables for storage, but in theory we could also add
44 // variants which need temporary files or pipes...
45 struct PreExecEntry {
46 name: &'static str, // Feel free to change to String if necessary...
47 store_fn: BoxedStoreFunc,
48 }
49
50 impl Reloader {
51 pub fn new() -> Result<Self, Error> {
52 Ok(Self {
53 pre_exec: Vec::new(),
54
55 // Get the path to our executable as PathBuf
56 self_exe: std::fs::read_link("/proc/self/exe")?,
57 })
58 }
59
60 /// Restore an object from an environment variable of the given name, or, if none exists, uses
61 /// the function provided in the `or_create` parameter to instantiate the new "first" instance.
62 ///
63 /// Values created via this method will be remembered for later re-execution.
64 pub async fn restore<T, F, U>(&mut self, name: &'static str, or_create: F) -> Result<T, Error>
65 where
66 T: Reloadable,
67 F: FnOnce() -> U,
68 U: Future<Output = Result<T, Error>>,
69 {
70 let res = match std::env::var(name) {
71 Ok(varstr) => T::restore(&varstr)?,
72 Err(std::env::VarError::NotPresent) => or_create().await?,
73 Err(_) => bail!("variable {} has invalid value", name),
74 };
75
76 self.pre_exec.push(PreExecEntry {
77 name,
78 store_fn: res.get_store_func()?,
79 });
80 Ok(res)
81 }
82
83 fn pre_exec(self) -> Result<(), Error> {
84 for mut item in self.pre_exec {
85 std::env::set_var(item.name, (item.store_fn)()?);
86 }
87 Ok(())
88 }
89
90 pub fn fork_restart(self) -> Result<(), Error> {
91 // Get our parameters as Vec<CString>
92 let args = std::env::args_os();
93 let mut new_args = Vec::with_capacity(args.len());
94 for arg in args {
95 new_args.push(CString::new(arg.as_bytes())?);
96 }
97
98 // Synchronisation pipe:
99 let (pold, pnew) = super::socketpair()?;
100
101 // Start ourselves in the background:
102 use nix::unistd::{fork, ForkResult};
103 match unsafe { fork() } {
104 Ok(ForkResult::Child) => {
105 // Double fork so systemd can supervise us without nagging...
106 match unsafe { fork() } {
107 Ok(ForkResult::Child) => {
108 std::mem::drop(pold);
109 // At this point we call pre-exec helpers. We must be certain that if they fail for
110 // whatever reason we can still call `_exit()`, so use catch_unwind.
111 match std::panic::catch_unwind(move || {
112 let mut pnew = unsafe {
113 std::fs::File::from_raw_fd(pnew.into_raw_fd())
114 };
115 let pid = nix::unistd::Pid::this();
116 if let Err(e) = unsafe { pnew.write_host_value(pid.as_raw()) } {
117 log::error!("failed to send new server PID to parent: {}", e);
118 unsafe {
119 libc::_exit(-1);
120 }
121 }
122
123 let mut ok = [0u8];
124 if let Err(e) = pnew.read_exact(&mut ok) {
125 log::error!("parent vanished before notifying systemd: {}", e);
126 unsafe {
127 libc::_exit(-1);
128 }
129 }
130 assert_eq!(ok[0], 1, "reload handshake should have sent a 1 byte");
131
132 std::mem::drop(pnew);
133
134 // Try to reopen STDOUT/STDERR journald streams to get correct PID in logs
135 let ident = CString::new(self.self_exe.file_name().unwrap().as_bytes()).unwrap();
136 let ident = ident.as_bytes();
137 let fd = unsafe { sd_journal_stream_fd(ident.as_ptr(), libc::LOG_INFO, 1) };
138 if fd >= 0 && fd != 1 {
139 let fd = proxmox::tools::fd::Fd(fd); // add drop handler
140 nix::unistd::dup2(fd.as_raw_fd(), 1)?;
141 } else {
142 log::error!("failed to update STDOUT journal redirection ({})", fd);
143 }
144 let fd = unsafe { sd_journal_stream_fd(ident.as_ptr(), libc::LOG_ERR, 1) };
145 if fd >= 0 && fd != 2 {
146 let fd = proxmox::tools::fd::Fd(fd); // add drop handler
147 nix::unistd::dup2(fd.as_raw_fd(), 2)?;
148 } else {
149 log::error!("failed to update STDERR journal redirection ({})", fd);
150 }
151
152 self.do_reexec(new_args)
153 })
154 {
155 Ok(Ok(())) => eprintln!("do_reexec returned!"),
156 Ok(Err(err)) => eprintln!("do_reexec failed: {}", err),
157 Err(_) => eprintln!("panic in re-exec"),
158 }
159 }
160 Ok(ForkResult::Parent { child }) => {
161 std::mem::drop((pold, pnew));
162 log::debug!("forked off a new server (second pid: {})", child);
163 }
164 Err(e) => log::error!("fork() failed, restart delayed: {}", e),
165 }
166 // No matter how we managed to get here, this is the time where we bail out quickly:
167 unsafe {
168 libc::_exit(-1)
169 }
170 }
171 Ok(ForkResult::Parent { child }) => {
172 log::debug!("forked off a new server (first pid: {}), waiting for 2nd pid", child);
173 std::mem::drop(pnew);
174 let mut pold = unsafe {
175 std::fs::File::from_raw_fd(pold.into_raw_fd())
176 };
177 let child = nix::unistd::Pid::from_raw(match unsafe { pold.read_le_value() } {
178 Ok(v) => v,
179 Err(e) => {
180 log::error!("failed to receive pid of double-forked child process: {}", e);
181 // systemd will complain but won't kill the service...
182 return Ok(());
183 }
184 });
185
186 if let Err(e) = systemd_notify(SystemdNotify::MainPid(child)) {
187 log::error!("failed to notify systemd about the new main pid: {}", e);
188 }
189
190 // notify child that it is now the new main process:
191 if let Err(e) = pold.write_all(&[1u8]) {
192 log::error!("child vanished during reload: {}", e);
193 }
194
195 Ok(())
196 }
197 Err(e) => {
198 log::error!("fork() failed, restart delayed: {}", e);
199 Ok(())
200 }
201 }
202 }
203
204 fn do_reexec(self, args: Vec<CString>) -> Result<(), Error> {
205 let exe = CString::new(self.self_exe.as_os_str().as_bytes())?;
206 self.pre_exec()?;
207 nix::unistd::setsid()?;
208 let args: Vec<&std::ffi::CStr> = args.iter().map(|s| s.as_ref()).collect();
209 nix::unistd::execvp(&exe, &args)?;
210 panic!("exec misbehaved");
211 }
212 }
213
214 // For now all we need to do is store and reuse a tcp listening socket:
215 impl Reloadable for tokio::net::TcpListener {
216 // NOTE: The socket must not be closed when the store-function is called:
217 // FIXME: We could become "independent" of the TcpListener and its reference to the file
218 // descriptor by `dup()`ing it (and check if the listener still exists via kcmp()?)
219 fn get_store_func(&self) -> Result<BoxedStoreFunc, Error> {
220 let mut fd_opt = Some(Fd(
221 nix::fcntl::fcntl(self.as_raw_fd(), nix::fcntl::FcntlArg::F_DUPFD_CLOEXEC(0))?
222 ));
223 Ok(Box::new(move || {
224 let fd = fd_opt.take().unwrap();
225 fd_change_cloexec(fd.as_raw_fd(), false)?;
226 Ok(fd.into_raw_fd().to_string())
227 }))
228 }
229
230 fn restore(var: &str) -> Result<Self, Error> {
231 let fd = var.parse::<u32>()
232 .map_err(|e| format_err!("invalid file descriptor: {}", e))?
233 as RawFd;
234 fd_change_cloexec(fd, true)?;
235 Ok(Self::from_std(
236 unsafe { std::net::TcpListener::from_raw_fd(fd) },
237 )?)
238 }
239 }
240
241 /// This creates a future representing a daemon which reloads itself when receiving a SIGHUP.
242 /// If this is started regularly, a listening socket is created. In this case, the file descriptor
243 /// number will be remembered in `PROXMOX_BACKUP_LISTEN_FD`.
244 /// If the variable already exists, its contents will instead be used to restore the listening
245 /// socket. The finished listening socket is then passed to the `create_service` function which
246 /// can be used to setup the TLS and the HTTP daemon. The returned future has to call
247 /// [systemd_notify] with [SystemdNotify::Ready] when the service is ready.
248 pub async fn create_daemon<F, S>(
249 address: std::net::SocketAddr,
250 create_service: F,
251 service_name: &str,
252 ) -> Result<(), Error>
253 where
254 F: FnOnce(tokio::net::TcpListener) -> Result<S, Error>,
255 S: Future<Output = Result<(), Error>>,
256 {
257 let mut reloader = Reloader::new()?;
258
259 let listener: tokio::net::TcpListener = reloader.restore(
260 "PROXMOX_BACKUP_LISTEN_FD",
261 move || async move { Ok(tokio::net::TcpListener::bind(&address).await?) },
262 ).await?;
263
264 let service = create_service(listener)?;
265
266 let service = async move {
267 if let Err(err) = service.await {
268 log::error!("server error: {}", err);
269 }
270 };
271
272 let server_future = Box::pin(service);
273 let shutdown_future = crate::shutdown_future();
274
275 let finish_future = match future::select(server_future, shutdown_future).await {
276 Either::Left((_, _)) => {
277 if !crate::shutdown_requested() {
278 crate::request_shutdown(); // make sure we are in shutdown mode
279 }
280 None
281 }
282 Either::Right((_, server_future)) => Some(server_future),
283 };
284
285 let mut reloader = Some(reloader);
286
287 if crate::is_reload_request() {
288 log::info!("daemon reload...");
289 if let Err(e) = systemd_notify(SystemdNotify::Reloading) {
290 log::error!("failed to notify systemd about the state change: {}", e);
291 }
292 wait_service_is_state(service_name, "reloading").await?;
293 if let Err(e) = reloader.take().unwrap().fork_restart() {
294 log::error!("error during reload: {}", e);
295 let _ = systemd_notify(SystemdNotify::Status("error during reload".to_string()));
296 }
297 } else {
298 log::info!("daemon shutting down...");
299 }
300
301 if let Some(future) = finish_future {
302 future.await;
303 }
304
305 // FIXME: this is a hack, replace with sd_notify_barrier when available
306 if crate::is_reload_request() {
307 wait_service_is_not_state(service_name, "reloading").await?;
308 }
309
310 log::info!("daemon shut down...");
311 Ok(())
312 }
313
314 // hack, do not use if unsure!
315 async fn get_service_state(service: &str) -> Result<String, Error> {
316 let text = match tokio::process::Command::new("systemctl")
317 .args(&["is-active", service])
318 .output()
319 .await
320 {
321 Ok(output) => match String::from_utf8(output.stdout) {
322 Ok(text) => text,
323 Err(err) => bail!("output of 'systemctl is-active' not valid UTF-8 - {}", err),
324 },
325 Err(err) => bail!("executing 'systemctl is-active' failed - {}", err),
326 };
327
328 Ok(text.trim().trim_start().to_string())
329 }
330
331 async fn wait_service_is_state(service: &str, state: &str) -> Result<(), Error> {
332 tokio::time::sleep(std::time::Duration::new(1, 0)).await;
333 while get_service_state(service).await? != state {
334 tokio::time::sleep(std::time::Duration::new(5, 0)).await;
335 }
336 Ok(())
337 }
338
339 async fn wait_service_is_not_state(service: &str, state: &str) -> Result<(), Error> {
340 tokio::time::sleep(std::time::Duration::new(1, 0)).await;
341 while get_service_state(service).await? == state {
342 tokio::time::sleep(std::time::Duration::new(5, 0)).await;
343 }
344 Ok(())
345 }
346
347 #[link(name = "systemd")]
348 extern "C" {
349 fn sd_notify(unset_environment: c_int, state: *const c_char) -> c_int;
350 }
351
352 /// Systemd sercice startup states (see: ``man sd_notify``)
353 pub enum SystemdNotify {
354 Ready,
355 Reloading,
356 Stopping,
357 Status(String),
358 MainPid(nix::unistd::Pid),
359 }
360
361 /// Tells systemd the startup state of the service (see: ``man sd_notify``)
362 pub fn systemd_notify(state: SystemdNotify) -> Result<(), Error> {
363 let message = match state {
364 SystemdNotify::Ready => CString::new("READY=1"),
365 SystemdNotify::Reloading => CString::new("RELOADING=1"),
366 SystemdNotify::Stopping => CString::new("STOPPING=1"),
367 SystemdNotify::Status(msg) => CString::new(format!("STATUS={}", msg)),
368 SystemdNotify::MainPid(pid) => CString::new(format!("MAINPID={}", pid)),
369 }?;
370 let rc = unsafe { sd_notify(0, message.as_ptr()) };
371 if rc < 0 {
372 bail!(
373 "systemd_notify failed: {}",
374 std::io::Error::from_raw_os_error(-rc),
375 );
376 }
377 Ok(())
378 }