]> git.proxmox.com Git - proxmox-backup.git/blob - src/server/state.rs
start new proxmox-rest-server workspace
[proxmox-backup.git] / src / server / state.rs
1 use anyhow::{Error};
2 use lazy_static::lazy_static;
3 use std::sync::Mutex;
4
5 use futures::*;
6
7 use tokio::signal::unix::{signal, SignalKind};
8
9 use pbs_tools::broadcast_future::BroadcastData;
10
11 #[derive(PartialEq, Copy, Clone, Debug)]
12 pub enum ServerMode {
13 Normal,
14 Shutdown,
15 }
16
17 pub struct ServerState {
18 pub mode: ServerMode,
19 pub shutdown_listeners: BroadcastData<()>,
20 pub last_worker_listeners: BroadcastData<()>,
21 pub worker_count: usize,
22 pub internal_task_count: usize,
23 pub reload_request: bool,
24 }
25
26 lazy_static! {
27 static ref SERVER_STATE: Mutex<ServerState> = Mutex::new(ServerState {
28 mode: ServerMode::Normal,
29 shutdown_listeners: BroadcastData::new(),
30 last_worker_listeners: BroadcastData::new(),
31 worker_count: 0,
32 internal_task_count: 0,
33 reload_request: false,
34 });
35 }
36
37 pub fn server_state_init() -> Result<(), Error> {
38
39 let mut stream = signal(SignalKind::interrupt())?;
40
41 let future = async move {
42 while stream.recv().await.is_some() {
43 println!("got shutdown request (SIGINT)");
44 SERVER_STATE.lock().unwrap().reload_request = false;
45 crate::tools::request_shutdown();
46 }
47 }.boxed();
48
49 let abort_future = last_worker_future().map_err(|_| {});
50 let task = futures::future::select(future, abort_future);
51
52 tokio::spawn(task.map(|_| ()));
53
54 let mut stream = signal(SignalKind::hangup())?;
55
56 let future = async move {
57 while stream.recv().await.is_some() {
58 println!("got reload request (SIGHUP)");
59 SERVER_STATE.lock().unwrap().reload_request = true;
60 crate::tools::request_shutdown();
61 }
62 }.boxed();
63
64 let abort_future = last_worker_future().map_err(|_| {});
65 let task = futures::future::select(future, abort_future);
66
67 tokio::spawn(task.map(|_| ()));
68
69 Ok(())
70 }
71
72 pub fn is_reload_request() -> bool {
73 let data = SERVER_STATE.lock().unwrap();
74
75 data.mode == ServerMode::Shutdown && data.reload_request
76 }
77
78 pub fn server_shutdown() {
79 let mut data = SERVER_STATE.lock().unwrap();
80
81 println!("SET SHUTDOWN MODE");
82
83 data.mode = ServerMode::Shutdown;
84
85 data.shutdown_listeners.notify_listeners(Ok(()));
86
87 drop(data); // unlock
88
89 check_last_worker();
90 }
91
92 pub fn shutdown_future() -> impl Future<Output = ()> {
93 let mut data = SERVER_STATE.lock().unwrap();
94 data
95 .shutdown_listeners
96 .listen()
97 .map(|_| ())
98 }
99
100 pub fn last_worker_future() -> impl Future<Output = Result<(), Error>> {
101 let mut data = SERVER_STATE.lock().unwrap();
102 data.last_worker_listeners.listen()
103 }
104
105 pub fn set_worker_count(count: usize) {
106 SERVER_STATE.lock().unwrap().worker_count = count;
107
108 check_last_worker();
109 }
110
111 pub fn check_last_worker() {
112 let mut data = SERVER_STATE.lock().unwrap();
113
114 if !(data.mode == ServerMode::Shutdown && data.worker_count == 0 && data.internal_task_count == 0) { return; }
115
116 data.last_worker_listeners.notify_listeners(Ok(()));
117 }
118
119 /// Spawns a tokio task that will be tracked for reload
120 /// and if it is finished, notify the last_worker_listener if we
121 /// are in shutdown mode
122 pub fn spawn_internal_task<T>(task: T)
123 where
124 T: Future + Send + 'static,
125 T::Output: Send + 'static,
126 {
127 let mut data = SERVER_STATE.lock().unwrap();
128 data.internal_task_count += 1;
129
130 tokio::spawn(async move {
131 let _ = tokio::spawn(task).await; // ignore errors
132
133 { // drop mutex
134 let mut data = SERVER_STATE.lock().unwrap();
135 if data.internal_task_count > 0 {
136 data.internal_task_count -= 1;
137 }
138 }
139
140 check_last_worker();
141 });
142 }