]> git.proxmox.com Git - proxmox-backup.git/blame - src/server/state.rs
proxy: split out code to run garbage collection job
[proxmox-backup.git] / src / server / state.rs
CommitLineData
f7d4e4b5 1use anyhow::{Error};
7a630df7
DM
2use lazy_static::lazy_static;
3use std::sync::Mutex;
4
5use futures::*;
7a630df7 6
db0cb9ce 7use tokio::signal::unix::{signal, SignalKind};
7a630df7 8
e45afdff 9use crate::tools::{self, BroadcastData};
7a630df7
DM
10
11#[derive(PartialEq, Copy, Clone, Debug)]
12pub enum ServerMode {
13 Normal,
14 Shutdown,
15}
16
17pub struct ServerState {
18 pub mode: ServerMode,
e45afdff
DM
19 pub shutdown_listeners: BroadcastData<()>,
20 pub last_worker_listeners: BroadcastData<()>,
7a630df7 21 pub worker_count: usize,
3da9b7e0 22 pub internal_task_count: usize,
7a630df7
DM
23 pub reload_request: bool,
24}
25
7a630df7
DM
26lazy_static! {
27 static ref SERVER_STATE: Mutex<ServerState> = Mutex::new(ServerState {
28 mode: ServerMode::Normal,
e45afdff
DM
29 shutdown_listeners: BroadcastData::new(),
30 last_worker_listeners: BroadcastData::new(),
7a630df7 31 worker_count: 0,
3da9b7e0 32 internal_task_count: 0,
7a630df7
DM
33 reload_request: false,
34 });
35}
36
37pub fn server_state_init() -> Result<(), Error> {
38
db0cb9ce 39 let mut stream = signal(SignalKind::interrupt())?;
7a630df7 40
db0cb9ce
WB
41 let future = async move {
42 while stream.recv().await.is_some() {
43 println!("got shutdown request (SIGINT)");
44 SERVER_STATE.lock().unwrap().reload_request = false;
45 tools::request_shutdown();
46 }
47 }.boxed();
7a630df7
DM
48
49 let abort_future = last_worker_future().map_err(|_| {});
aa4110cc 50 let task = futures::future::select(future, abort_future);
7a630df7 51
aa4110cc 52 tokio::spawn(task.map(|_| ()));
7a630df7 53
db0cb9ce 54 let mut stream = signal(SignalKind::hangup())?;
7a630df7 55
db0cb9ce
WB
56 let future = async move {
57 while stream.recv().await.is_some() {
58 println!("got reload request (SIGHUP)");
59 SERVER_STATE.lock().unwrap().reload_request = true;
60 tools::request_shutdown();
61 }
62 }.boxed();
7a630df7
DM
63
64 let abort_future = last_worker_future().map_err(|_| {});
aa4110cc 65 let task = futures::future::select(future, abort_future);
7a630df7 66
aa4110cc 67 tokio::spawn(task.map(|_| ()));
7a630df7
DM
68
69 Ok(())
70}
71
72pub fn is_reload_request() -> bool {
73 let data = SERVER_STATE.lock().unwrap();
74
62ee2eb4 75 data.mode == ServerMode::Shutdown && data.reload_request
7a630df7
DM
76}
77
78pub fn server_shutdown() {
79 let mut data = SERVER_STATE.lock().unwrap();
80
81 println!("SET SHUTDOWN MODE");
82
83 data.mode = ServerMode::Shutdown;
84
e45afdff 85 data.shutdown_listeners.notify_listeners(Ok(()));
7a630df7
DM
86
87 drop(data); // unlock
88
89 check_last_worker();
90}
91
aa4110cc 92pub fn shutdown_future() -> impl Future<Output = ()> {
7a630df7 93 let mut data = SERVER_STATE.lock().unwrap();
aa4110cc
WB
94 data
95 .shutdown_listeners
96 .listen()
97 .map(|_| ())
7a630df7
DM
98}
99
aa4110cc 100pub fn last_worker_future() -> impl Future<Output = Result<(), Error>> {
7a630df7 101 let mut data = SERVER_STATE.lock().unwrap();
e45afdff 102 data.last_worker_listeners.listen()
7a630df7
DM
103}
104
105pub fn set_worker_count(count: usize) {
33a88daf
DC
106 SERVER_STATE.lock().unwrap().worker_count = count;
107
108 check_last_worker();
109}
110
111pub fn check_last_worker() {
7a630df7 112 let mut data = SERVER_STATE.lock().unwrap();
7a630df7 113
3da9b7e0 114 if !(data.mode == ServerMode::Shutdown && data.worker_count == 0 && data.internal_task_count == 0) { return; }
7a630df7 115
e45afdff 116 data.last_worker_listeners.notify_listeners(Ok(()));
7a630df7
DM
117}
118
33a88daf
DC
119/// Spawns a tokio task that will be tracked for reload
120/// and if it is finished, notify the last_worker_listener if we
121/// are in shutdown mode
122pub fn spawn_internal_task<T>(task: T)
123where
124 T: Future + Send + 'static,
125 T::Output: Send + 'static,
126{
7a630df7 127 let mut data = SERVER_STATE.lock().unwrap();
3da9b7e0 128 data.internal_task_count += 1;
7a630df7 129
33a88daf
DC
130 tokio::spawn(async move {
131 let _ = tokio::spawn(task).await; // ignore errors
7a630df7 132
33a88daf
DC
133 { // drop mutex
134 let mut data = SERVER_STATE.lock().unwrap();
3da9b7e0
TL
135 if data.internal_task_count > 0 {
136 data.internal_task_count -= 1;
33a88daf
DC
137 }
138 }
139
140 check_last_worker();
141 });
7a630df7 142}