]> git.proxmox.com Git - proxmox-backup.git/blob - proxmox-rest-server/src/state.rs
955e3ce350e3c6e3303b752de9b1f53c2b883b4a
[proxmox-backup.git] / proxmox-rest-server / src / state.rs
1 use anyhow::{Error};
2 use lazy_static::lazy_static;
3 use std::sync::Mutex;
4
5 use futures::*;
6
7 use tokio::signal::unix::{signal, SignalKind};
8
9 use pbs_tools::broadcast_future::BroadcastData;
10
11 use crate::request_shutdown;
12
13 #[derive(PartialEq, Copy, Clone, Debug)]
14 enum ServerMode {
15 Normal,
16 Shutdown,
17 }
18
19 struct ServerState {
20 mode: ServerMode,
21 shutdown_listeners: BroadcastData<()>,
22 last_worker_listeners: BroadcastData<()>,
23 worker_count: usize,
24 internal_task_count: usize,
25 reload_request: bool,
26 }
27
28 lazy_static! {
29 static ref SERVER_STATE: Mutex<ServerState> = Mutex::new(ServerState {
30 mode: ServerMode::Normal,
31 shutdown_listeners: BroadcastData::new(),
32 last_worker_listeners: BroadcastData::new(),
33 worker_count: 0,
34 internal_task_count: 0,
35 reload_request: false,
36 });
37 }
38
39 /// Listen to ``SIGINT`` for server shutdown
40 ///
41 /// This calls [request_shutdown] when receiving the signal.
42 pub fn catch_shutdown_signal() -> Result<(), Error> {
43
44 let mut stream = signal(SignalKind::interrupt())?;
45
46 let future = async move {
47 while stream.recv().await.is_some() {
48 log::info!("got shutdown request (SIGINT)");
49 SERVER_STATE.lock().unwrap().reload_request = false;
50 request_shutdown();
51 }
52 }.boxed();
53
54 let abort_future = last_worker_future().map_err(|_| {});
55 let task = futures::future::select(future, abort_future);
56
57 tokio::spawn(task.map(|_| ()));
58
59 Ok(())
60 }
61
62 /// Listen to ``SIGHUP`` for server reload
63 ///
64 /// This calls [request_shutdown] when receiving the signal, and tries
65 /// to restart the server.
66 pub fn catch_reload_signal() -> Result<(), Error> {
67
68 let mut stream = signal(SignalKind::hangup())?;
69
70 let future = async move {
71 while stream.recv().await.is_some() {
72 log::info!("got reload request (SIGHUP)");
73 SERVER_STATE.lock().unwrap().reload_request = true;
74 crate::request_shutdown();
75 }
76 }.boxed();
77
78 let abort_future = last_worker_future().map_err(|_| {});
79 let task = futures::future::select(future, abort_future);
80
81 tokio::spawn(task.map(|_| ()));
82
83 Ok(())
84 }
85
86 pub(crate) fn is_reload_request() -> bool {
87 let data = SERVER_STATE.lock().unwrap();
88
89 data.mode == ServerMode::Shutdown && data.reload_request
90 }
91
92
93 pub(crate) fn server_shutdown() {
94 let mut data = SERVER_STATE.lock().unwrap();
95
96 log::info!("request_shutdown");
97
98 data.mode = ServerMode::Shutdown;
99
100 data.shutdown_listeners.notify_listeners(Ok(()));
101
102 drop(data); // unlock
103
104 check_last_worker();
105 }
106
107 /// Future to signal server shutdown
108 pub fn shutdown_future() -> impl Future<Output = ()> {
109 let mut data = SERVER_STATE.lock().unwrap();
110 data
111 .shutdown_listeners
112 .listen()
113 .map(|_| ())
114 }
115
116 /// Future to signal when last worker task finished
117 pub fn last_worker_future() -> impl Future<Output = Result<(), Error>> {
118 let mut data = SERVER_STATE.lock().unwrap();
119 data.last_worker_listeners.listen()
120 }
121
122 pub(crate) fn set_worker_count(count: usize) {
123 SERVER_STATE.lock().unwrap().worker_count = count;
124
125 check_last_worker();
126 }
127
128 pub(crate) fn check_last_worker() {
129 let mut data = SERVER_STATE.lock().unwrap();
130
131 if !(data.mode == ServerMode::Shutdown && data.worker_count == 0 && data.internal_task_count == 0) { return; }
132
133 data.last_worker_listeners.notify_listeners(Ok(()));
134 }
135
136 /// Spawns a tokio task that will be tracked for reload
137 /// and if it is finished, notify the [last_worker_future] if we
138 /// are in shutdown mode.
139 pub fn spawn_internal_task<T>(task: T)
140 where
141 T: Future + Send + 'static,
142 T::Output: Send + 'static,
143 {
144 let mut data = SERVER_STATE.lock().unwrap();
145 data.internal_task_count += 1;
146
147 tokio::spawn(async move {
148 let _ = tokio::spawn(task).await; // ignore errors
149
150 { // drop mutex
151 let mut data = SERVER_STATE.lock().unwrap();
152 if data.internal_task_count > 0 {
153 data.internal_task_count -= 1;
154 }
155 }
156
157 check_last_worker();
158 });
159 }