]>
Commit | Line | Data |
---|---|---|
53daae8e | 1 | //! Helpers to implement restartable daemons/services. |
dce94d0e WB |
2 | |
3 | use std::ffi::CString; | |
083ff3fd | 4 | use std::future::Future; |
c08fac4d | 5 | use std::io::{Read, Write}; |
3ddb1488 | 6 | use std::os::raw::{c_char, c_uchar, c_int}; |
620dccf1 | 7 | use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; |
dce94d0e WB |
8 | use std::os::unix::ffi::OsStrExt; |
9 | use std::panic::UnwindSafe; | |
3ddb1488 | 10 | use std::path::PathBuf; |
dce94d0e | 11 | |
f7d4e4b5 | 12 | use anyhow::{bail, format_err, Error}; |
9e45e03a | 13 | use futures::future::{self, Either}; |
4422ba2c | 14 | |
ca3c3ce9 | 15 | use proxmox::tools::io::{ReadExt, WriteExt}; |
8bca935f | 16 | use proxmox::tools::fd::Fd; |
ca3c3ce9 | 17 | |
8bca935f | 18 | use crate::fd_change_cloexec; |
dce94d0e | 19 | |
3ddb1488 DM |
20 | #[link(name = "systemd")] |
21 | extern "C" { | |
22 | fn sd_journal_stream_fd(identifier: *const c_uchar, priority: c_int, level_prefix: c_int) -> c_int; | |
23 | } | |
24 | ||
dce94d0e | 25 | // Unfortunately FnBox is nightly-only and Box<FnOnce> is unusable, so just use Box<Fn>... |
8cf445ec | 26 | type BoxedStoreFunc = Box<dyn FnMut() -> Result<String, Error> + UnwindSafe + Send>; |
dce94d0e WB |
27 | |
28 | /// Helper trait to "store" something in the environment to be re-used after re-executing the | |
29 | /// service on a reload. | |
e4311382 | 30 | pub trait Reloadable: Sized { |
dce94d0e | 31 | fn restore(var: &str) -> Result<Self, Error>; |
620dccf1 | 32 | fn get_store_func(&self) -> Result<BoxedStoreFunc, Error>; |
dce94d0e WB |
33 | } |
34 | ||
35 | /// Manages things to be stored and reloaded upon reexec. | |
36 | /// Anything which should be restorable should be instantiated via this struct's `restore` method, | |
62ee2eb4 | 37 | #[derive(Default)] |
e4311382 | 38 | pub struct Reloader { |
dce94d0e | 39 | pre_exec: Vec<PreExecEntry>, |
3ddb1488 | 40 | self_exe: PathBuf, |
dce94d0e WB |
41 | } |
42 | ||
43 | // Currently we only need environment variables for storage, but in theory we could also add | |
44 | // variants which need temporary files or pipes... | |
45 | struct PreExecEntry { | |
46 | name: &'static str, // Feel free to change to String if necessary... | |
47 | store_fn: BoxedStoreFunc, | |
48 | } | |
49 | ||
e4311382 | 50 | impl Reloader { |
dc2ef2b5 WB |
51 | pub fn new() -> Result<Self, Error> { |
52 | Ok(Self { | |
dce94d0e | 53 | pre_exec: Vec::new(), |
dc2ef2b5 | 54 | |
3ddb1488 DM |
55 | // Get the path to our executable as PathBuf |
56 | self_exe: std::fs::read_link("/proc/self/exe")?, | |
dc2ef2b5 | 57 | }) |
dce94d0e WB |
58 | } |
59 | ||
60 | /// Restore an object from an environment variable of the given name, or, if none exists, uses | |
61 | /// the function provided in the `or_create` parameter to instantiate the new "first" instance. | |
62 | /// | |
63 | /// Values created via this method will be remembered for later re-execution. | |
083ff3fd | 64 | pub async fn restore<T, F, U>(&mut self, name: &'static str, or_create: F) -> Result<T, Error> |
dce94d0e | 65 | where |
e4311382 | 66 | T: Reloadable, |
083ff3fd WB |
67 | F: FnOnce() -> U, |
68 | U: Future<Output = Result<T, Error>>, | |
dce94d0e WB |
69 | { |
70 | let res = match std::env::var(name) { | |
71 | Ok(varstr) => T::restore(&varstr)?, | |
083ff3fd | 72 | Err(std::env::VarError::NotPresent) => or_create().await?, |
dce94d0e WB |
73 | Err(_) => bail!("variable {} has invalid value", name), |
74 | }; | |
75 | ||
76 | self.pre_exec.push(PreExecEntry { | |
77 | name, | |
620dccf1 | 78 | store_fn: res.get_store_func()?, |
dce94d0e WB |
79 | }); |
80 | Ok(res) | |
81 | } | |
82 | ||
83 | fn pre_exec(self) -> Result<(), Error> { | |
620dccf1 | 84 | for mut item in self.pre_exec { |
dce94d0e WB |
85 | std::env::set_var(item.name, (item.store_fn)()?); |
86 | } | |
87 | Ok(()) | |
88 | } | |
89 | ||
90 | pub fn fork_restart(self) -> Result<(), Error> { | |
dce94d0e WB |
91 | // Get our parameters as Vec<CString> |
92 | let args = std::env::args_os(); | |
93 | let mut new_args = Vec::with_capacity(args.len()); | |
94 | for arg in args { | |
95 | new_args.push(CString::new(arg.as_bytes())?); | |
96 | } | |
97 | ||
5e5eed5c | 98 | // Synchronisation pipe: |
c08fac4d | 99 | let (pold, pnew) = super::socketpair()?; |
5e5eed5c | 100 | |
dce94d0e WB |
101 | // Start ourselves in the background: |
102 | use nix::unistd::{fork, ForkResult}; | |
0c4c6a7b | 103 | match unsafe { fork() } { |
dce94d0e | 104 | Ok(ForkResult::Child) => { |
5e5eed5c | 105 | // Double fork so systemd can supervise us without nagging... |
0c4c6a7b | 106 | match unsafe { fork() } { |
5e5eed5c | 107 | Ok(ForkResult::Child) => { |
c08fac4d | 108 | std::mem::drop(pold); |
5e5eed5c WB |
109 | // At this point we call pre-exec helpers. We must be certain that if they fail for |
110 | // whatever reason we can still call `_exit()`, so use catch_unwind. | |
111 | match std::panic::catch_unwind(move || { | |
c08fac4d WB |
112 | let mut pnew = unsafe { |
113 | std::fs::File::from_raw_fd(pnew.into_raw_fd()) | |
5e5eed5c WB |
114 | }; |
115 | let pid = nix::unistd::Pid::this(); | |
c08fac4d | 116 | if let Err(e) = unsafe { pnew.write_host_value(pid.as_raw()) } { |
5e5eed5c WB |
117 | log::error!("failed to send new server PID to parent: {}", e); |
118 | unsafe { | |
119 | libc::_exit(-1); | |
120 | } | |
121 | } | |
c08fac4d WB |
122 | |
123 | let mut ok = [0u8]; | |
124 | if let Err(e) = pnew.read_exact(&mut ok) { | |
125 | log::error!("parent vanished before notifying systemd: {}", e); | |
126 | unsafe { | |
127 | libc::_exit(-1); | |
128 | } | |
129 | } | |
130 | assert_eq!(ok[0], 1, "reload handshake should have sent a 1 byte"); | |
131 | ||
132 | std::mem::drop(pnew); | |
3ddb1488 DM |
133 | |
134 | // Try to reopen STDOUT/STDERR journald streams to get correct PID in logs | |
135 | let ident = CString::new(self.self_exe.file_name().unwrap().as_bytes()).unwrap(); | |
136 | let ident = ident.as_bytes(); | |
137 | let fd = unsafe { sd_journal_stream_fd(ident.as_ptr(), libc::LOG_INFO, 1) }; | |
138 | if fd >= 0 && fd != 1 { | |
139 | let fd = proxmox::tools::fd::Fd(fd); // add drop handler | |
140 | nix::unistd::dup2(fd.as_raw_fd(), 1)?; | |
141 | } else { | |
142 | log::error!("failed to update STDOUT journal redirection ({})", fd); | |
143 | } | |
144 | let fd = unsafe { sd_journal_stream_fd(ident.as_ptr(), libc::LOG_ERR, 1) }; | |
145 | if fd >= 0 && fd != 2 { | |
146 | let fd = proxmox::tools::fd::Fd(fd); // add drop handler | |
147 | nix::unistd::dup2(fd.as_raw_fd(), 2)?; | |
148 | } else { | |
149 | log::error!("failed to update STDERR journal redirection ({})", fd); | |
150 | } | |
151 | ||
dc2ef2b5 | 152 | self.do_reexec(new_args) |
5e5eed5c WB |
153 | }) |
154 | { | |
38da8ca1 DM |
155 | Ok(Ok(())) => log::error!("do_reexec returned!"), |
156 | Ok(Err(err)) => log::error!("do_reexec failed: {}", err), | |
157 | Err(_) => log::error!("panic in re-exec"), | |
5e5eed5c WB |
158 | } |
159 | } | |
160 | Ok(ForkResult::Parent { child }) => { | |
c08fac4d | 161 | std::mem::drop((pold, pnew)); |
5e5eed5c WB |
162 | log::debug!("forked off a new server (second pid: {})", child); |
163 | } | |
164 | Err(e) => log::error!("fork() failed, restart delayed: {}", e), | |
dce94d0e WB |
165 | } |
166 | // No matter how we managed to get here, this is the time where we bail out quickly: | |
167 | unsafe { | |
168 | libc::_exit(-1) | |
169 | } | |
170 | } | |
171 | Ok(ForkResult::Parent { child }) => { | |
5e5eed5c | 172 | log::debug!("forked off a new server (first pid: {}), waiting for 2nd pid", child); |
c08fac4d WB |
173 | std::mem::drop(pnew); |
174 | let mut pold = unsafe { | |
175 | std::fs::File::from_raw_fd(pold.into_raw_fd()) | |
5e5eed5c | 176 | }; |
c08fac4d | 177 | let child = nix::unistd::Pid::from_raw(match unsafe { pold.read_le_value() } { |
5e5eed5c WB |
178 | Ok(v) => v, |
179 | Err(e) => { | |
180 | log::error!("failed to receive pid of double-forked child process: {}", e); | |
181 | // systemd will complain but won't kill the service... | |
182 | return Ok(()); | |
183 | } | |
184 | }); | |
185 | ||
d98c9a7a WB |
186 | if let Err(e) = systemd_notify(SystemdNotify::MainPid(child)) { |
187 | log::error!("failed to notify systemd about the new main pid: {}", e); | |
188 | } | |
c08fac4d WB |
189 | |
190 | // notify child that it is now the new main process: | |
191 | if let Err(e) = pold.write_all(&[1u8]) { | |
192 | log::error!("child vanished during reload: {}", e); | |
193 | } | |
194 | ||
dce94d0e WB |
195 | Ok(()) |
196 | } | |
197 | Err(e) => { | |
5e5eed5c | 198 | log::error!("fork() failed, restart delayed: {}", e); |
dce94d0e WB |
199 | Ok(()) |
200 | } | |
201 | } | |
202 | } | |
203 | ||
dc2ef2b5 | 204 | fn do_reexec(self, args: Vec<CString>) -> Result<(), Error> { |
3ddb1488 | 205 | let exe = CString::new(self.self_exe.as_os_str().as_bytes())?; |
dce94d0e WB |
206 | self.pre_exec()?; |
207 | nix::unistd::setsid()?; | |
8bf4559b | 208 | let args: Vec<&std::ffi::CStr> = args.iter().map(|s| s.as_ref()).collect(); |
dce94d0e | 209 | nix::unistd::execvp(&exe, &args)?; |
dc2ef2b5 | 210 | panic!("exec misbehaved"); |
dce94d0e WB |
211 | } |
212 | } | |
4422ba2c | 213 | |
af70c181 | 214 | // For now all we need to do is store and reuse a tcp listening socket: |
e4311382 | 215 | impl Reloadable for tokio::net::TcpListener { |
af70c181 WB |
216 | // NOTE: The socket must not be closed when the store-function is called: |
217 | // FIXME: We could become "independent" of the TcpListener and its reference to the file | |
218 | // descriptor by `dup()`ing it (and check if the listener still exists via kcmp()?) | |
620dccf1 | 219 | fn get_store_func(&self) -> Result<BoxedStoreFunc, Error> { |
8bca935f | 220 | let mut fd_opt = Some(Fd( |
620dccf1 WB |
221 | nix::fcntl::fcntl(self.as_raw_fd(), nix::fcntl::FcntlArg::F_DUPFD_CLOEXEC(0))? |
222 | )); | |
223 | Ok(Box::new(move || { | |
224 | let fd = fd_opt.take().unwrap(); | |
225 | fd_change_cloexec(fd.as_raw_fd(), false)?; | |
226 | Ok(fd.into_raw_fd().to_string()) | |
227 | })) | |
af70c181 WB |
228 | } |
229 | ||
230 | fn restore(var: &str) -> Result<Self, Error> { | |
231 | let fd = var.parse::<u32>() | |
232 | .map_err(|e| format_err!("invalid file descriptor: {}", e))? | |
233 | as RawFd; | |
234 | fd_change_cloexec(fd, true)?; | |
235 | Ok(Self::from_std( | |
236 | unsafe { std::net::TcpListener::from_raw_fd(fd) }, | |
af70c181 WB |
237 | )?) |
238 | } | |
239 | } | |
a690ecac WB |
240 | |
241 | /// This creates a future representing a daemon which reloads itself when receiving a SIGHUP. | |
242 | /// If this is started regularly, a listening socket is created. In this case, the file descriptor | |
243 | /// number will be remembered in `PROXMOX_BACKUP_LISTEN_FD`. | |
244 | /// If the variable already exists, its contents will instead be used to restore the listening | |
245 | /// socket. The finished listening socket is then passed to the `create_service` function which | |
d2654200 DM |
246 | /// can be used to setup the TLS and the HTTP daemon. The returned future has to call |
247 | /// [systemd_notify] with [SystemdNotify::Ready] when the service is ready. | |
083ff3fd | 248 | pub async fn create_daemon<F, S>( |
a690ecac WB |
249 | address: std::net::SocketAddr, |
250 | create_service: F, | |
d7c6ad60 | 251 | service_name: &str, |
083ff3fd | 252 | ) -> Result<(), Error> |
a690ecac | 253 | where |
d2654200 DM |
254 | F: FnOnce(tokio::net::TcpListener) -> Result<S, Error>, |
255 | S: Future<Output = Result<(), Error>>, | |
a690ecac | 256 | { |
dc2ef2b5 | 257 | let mut reloader = Reloader::new()?; |
a690ecac WB |
258 | |
259 | let listener: tokio::net::TcpListener = reloader.restore( | |
260 | "PROXMOX_BACKUP_LISTEN_FD", | |
083ff3fd WB |
261 | move || async move { Ok(tokio::net::TcpListener::bind(&address).await?) }, |
262 | ).await?; | |
a690ecac | 263 | |
d2654200 DM |
264 | let service = create_service(listener)?; |
265 | ||
266 | let service = async move { | |
267 | if let Err(err) = service.await { | |
268 | log::error!("server error: {}", err); | |
269 | } | |
270 | }; | |
271 | ||
272 | let server_future = Box::pin(service); | |
8bca935f | 273 | let shutdown_future = crate::shutdown_future(); |
9e45e03a DC |
274 | |
275 | let finish_future = match future::select(server_future, shutdown_future).await { | |
276 | Either::Left((_, _)) => { | |
a0ffd4a4 DM |
277 | if !crate::shutdown_requested() { |
278 | crate::request_shutdown(); // make sure we are in shutdown mode | |
279 | } | |
9e45e03a DC |
280 | None |
281 | } | |
282 | Either::Right((_, server_future)) => Some(server_future), | |
283 | }; | |
a690ecac | 284 | |
a690ecac WB |
285 | let mut reloader = Some(reloader); |
286 | ||
8bca935f | 287 | if crate::is_reload_request() { |
083ff3fd WB |
288 | log::info!("daemon reload..."); |
289 | if let Err(e) = systemd_notify(SystemdNotify::Reloading) { | |
290 | log::error!("failed to notify systemd about the state change: {}", e); | |
291 | } | |
0ec79339 | 292 | wait_service_is_state(service_name, "reloading").await?; |
083ff3fd WB |
293 | if let Err(e) = reloader.take().unwrap().fork_restart() { |
294 | log::error!("error during reload: {}", e); | |
62ee2eb4 | 295 | let _ = systemd_notify(SystemdNotify::Status("error during reload".to_string())); |
083ff3fd WB |
296 | } |
297 | } else { | |
298 | log::info!("daemon shutting down..."); | |
299 | } | |
9e45e03a DC |
300 | |
301 | if let Some(future) = finish_future { | |
302 | future.await; | |
303 | } | |
d7c6ad60 DC |
304 | |
305 | // FIXME: this is a hack, replace with sd_notify_barrier when available | |
8bca935f | 306 | if crate::is_reload_request() { |
0ec79339 | 307 | wait_service_is_not_state(service_name, "reloading").await?; |
d7c6ad60 DC |
308 | } |
309 | ||
38da8ca1 | 310 | log::info!("daemon shut down."); |
083ff3fd | 311 | Ok(()) |
a690ecac | 312 | } |
9c351a36 | 313 | |
06c9059d | 314 | // hack, do not use if unsure! |
0ec79339 DC |
315 | async fn get_service_state(service: &str) -> Result<String, Error> { |
316 | let text = match tokio::process::Command::new("systemctl") | |
317 | .args(&["is-active", service]) | |
318 | .output() | |
319 | .await | |
320 | { | |
321 | Ok(output) => match String::from_utf8(output.stdout) { | |
322 | Ok(text) => text, | |
323 | Err(err) => bail!("output of 'systemctl is-active' not valid UTF-8 - {}", err), | |
324 | }, | |
325 | Err(err) => bail!("executing 'systemctl is-active' failed - {}", err), | |
326 | }; | |
327 | ||
328 | Ok(text.trim().trim_start().to_string()) | |
329 | } | |
330 | ||
331 | async fn wait_service_is_state(service: &str, state: &str) -> Result<(), Error> { | |
0a8d773a | 332 | tokio::time::sleep(std::time::Duration::new(1, 0)).await; |
0ec79339 | 333 | while get_service_state(service).await? != state { |
0a8d773a | 334 | tokio::time::sleep(std::time::Duration::new(5, 0)).await; |
0ec79339 DC |
335 | } |
336 | Ok(()) | |
337 | } | |
06c9059d | 338 | |
0ec79339 | 339 | async fn wait_service_is_not_state(service: &str, state: &str) -> Result<(), Error> { |
0a8d773a | 340 | tokio::time::sleep(std::time::Duration::new(1, 0)).await; |
0ec79339 | 341 | while get_service_state(service).await? == state { |
0a8d773a | 342 | tokio::time::sleep(std::time::Duration::new(5, 0)).await; |
d7c6ad60 | 343 | } |
0ec79339 | 344 | Ok(()) |
d7c6ad60 DC |
345 | } |
346 | ||
9c351a36 WB |
347 | #[link(name = "systemd")] |
348 | extern "C" { | |
349 | fn sd_notify(unset_environment: c_int, state: *const c_char) -> c_int; | |
350 | } | |
351 | ||
53daae8e | 352 | /// Systemd sercice startup states (see: ``man sd_notify``) |
9c351a36 WB |
353 | pub enum SystemdNotify { |
354 | Ready, | |
355 | Reloading, | |
356 | Stopping, | |
357 | Status(String), | |
358 | MainPid(nix::unistd::Pid), | |
359 | } | |
360 | ||
53daae8e | 361 | /// Tells systemd the startup state of the service (see: ``man sd_notify``) |
9c351a36 | 362 | pub fn systemd_notify(state: SystemdNotify) -> Result<(), Error> { |
38da8ca1 DM |
363 | |
364 | if let SystemdNotify::Ready = &state { | |
365 | log::info!("service is ready"); | |
366 | } | |
367 | ||
9c351a36 WB |
368 | let message = match state { |
369 | SystemdNotify::Ready => CString::new("READY=1"), | |
370 | SystemdNotify::Reloading => CString::new("RELOADING=1"), | |
371 | SystemdNotify::Stopping => CString::new("STOPPING=1"), | |
372 | SystemdNotify::Status(msg) => CString::new(format!("STATUS={}", msg)), | |
373 | SystemdNotify::MainPid(pid) => CString::new(format!("MAINPID={}", pid)), | |
374 | }?; | |
375 | let rc = unsafe { sd_notify(0, message.as_ptr()) }; | |
376 | if rc < 0 { | |
377 | bail!( | |
378 | "systemd_notify failed: {}", | |
379 | std::io::Error::from_raw_os_error(-rc), | |
380 | ); | |
381 | } | |
38da8ca1 | 382 | |
9c351a36 WB |
383 | Ok(()) |
384 | } |