]> git.proxmox.com Git - proxmox-backup.git/blob - proxmox-rest-server/src/state.rs
proxmox-rest-server: improve logging
[proxmox-backup.git] / proxmox-rest-server / src / state.rs
1 use anyhow::{Error};
2 use lazy_static::lazy_static;
3 use std::sync::Mutex;
4
5 use futures::*;
6
7 use tokio::signal::unix::{signal, SignalKind};
8
9 use pbs_tools::broadcast_future::BroadcastData;
10
11 #[derive(PartialEq, Copy, Clone, Debug)]
12 pub enum ServerMode {
13 Normal,
14 Shutdown,
15 }
16
17 pub struct ServerState {
18 pub mode: ServerMode,
19 pub shutdown_listeners: BroadcastData<()>,
20 pub last_worker_listeners: BroadcastData<()>,
21 pub worker_count: usize,
22 pub internal_task_count: usize,
23 pub reload_request: bool,
24 }
25
26 lazy_static! {
27 static ref SERVER_STATE: Mutex<ServerState> = Mutex::new(ServerState {
28 mode: ServerMode::Normal,
29 shutdown_listeners: BroadcastData::new(),
30 last_worker_listeners: BroadcastData::new(),
31 worker_count: 0,
32 internal_task_count: 0,
33 reload_request: false,
34 });
35 }
36
37 /// Listen to ``SIGINT`` and ``SIGHUP`` signals
38 ///
39 /// * ``SIGINT``: start server shutdown
40 ///
41 /// * ``SIGHUP``: start server reload
42 pub fn catch_shutdown_and_reload_signals() -> Result<(), Error> {
43
44 let mut stream = signal(SignalKind::interrupt())?;
45
46 let future = async move {
47 while stream.recv().await.is_some() {
48 log::info!("got shutdown request (SIGINT)");
49 SERVER_STATE.lock().unwrap().reload_request = false;
50 crate::request_shutdown();
51 }
52 }.boxed();
53
54 let abort_future = last_worker_future().map_err(|_| {});
55 let task = futures::future::select(future, abort_future);
56
57 tokio::spawn(task.map(|_| ()));
58
59 let mut stream = signal(SignalKind::hangup())?;
60
61 let future = async move {
62 while stream.recv().await.is_some() {
63 log::info!("got reload request (SIGHUP)");
64 SERVER_STATE.lock().unwrap().reload_request = true;
65 crate::request_shutdown();
66 }
67 }.boxed();
68
69 let abort_future = last_worker_future().map_err(|_| {});
70 let task = futures::future::select(future, abort_future);
71
72 tokio::spawn(task.map(|_| ()));
73
74 Ok(())
75 }
76
77 pub fn is_reload_request() -> bool {
78 let data = SERVER_STATE.lock().unwrap();
79
80 data.mode == ServerMode::Shutdown && data.reload_request
81 }
82
83 pub fn server_shutdown() {
84 let mut data = SERVER_STATE.lock().unwrap();
85
86 log::info!("request_shutdown");
87
88 data.mode = ServerMode::Shutdown;
89
90 data.shutdown_listeners.notify_listeners(Ok(()));
91
92 drop(data); // unlock
93
94 check_last_worker();
95 }
96
97 pub fn shutdown_future() -> impl Future<Output = ()> {
98 let mut data = SERVER_STATE.lock().unwrap();
99 data
100 .shutdown_listeners
101 .listen()
102 .map(|_| ())
103 }
104
105 pub fn last_worker_future() -> impl Future<Output = Result<(), Error>> {
106 let mut data = SERVER_STATE.lock().unwrap();
107 data.last_worker_listeners.listen()
108 }
109
110 pub fn set_worker_count(count: usize) {
111 SERVER_STATE.lock().unwrap().worker_count = count;
112
113 check_last_worker();
114 }
115
116 pub fn check_last_worker() {
117 let mut data = SERVER_STATE.lock().unwrap();
118
119 if !(data.mode == ServerMode::Shutdown && data.worker_count == 0 && data.internal_task_count == 0) { return; }
120
121 data.last_worker_listeners.notify_listeners(Ok(()));
122 }
123
124 /// Spawns a tokio task that will be tracked for reload
125 /// and if it is finished, notify the last_worker_listener if we
126 /// are in shutdown mode
127 pub fn spawn_internal_task<T>(task: T)
128 where
129 T: Future + Send + 'static,
130 T::Output: Send + 'static,
131 {
132 let mut data = SERVER_STATE.lock().unwrap();
133 data.internal_task_count += 1;
134
135 tokio::spawn(async move {
136 let _ = tokio::spawn(task).await; // ignore errors
137
138 { // drop mutex
139 let mut data = SERVER_STATE.lock().unwrap();
140 if data.internal_task_count > 0 {
141 data.internal_task_count -= 1;
142 }
143 }
144
145 check_last_worker();
146 });
147 }