]>
Commit | Line | Data |
---|---|---|
f7d4e4b5 | 1 | use anyhow::{Error}; |
7a630df7 DM |
2 | use lazy_static::lazy_static; |
3 | use std::sync::Mutex; | |
4 | ||
5 | use futures::*; | |
7a630df7 | 6 | |
db0cb9ce | 7 | use tokio::signal::unix::{signal, SignalKind}; |
7a630df7 | 8 | |
e45afdff | 9 | use crate::tools::{self, BroadcastData}; |
7a630df7 DM |
10 | |
11 | #[derive(PartialEq, Copy, Clone, Debug)] | |
12 | pub enum ServerMode { | |
13 | Normal, | |
14 | Shutdown, | |
15 | } | |
16 | ||
17 | pub struct ServerState { | |
18 | pub mode: ServerMode, | |
e45afdff DM |
19 | pub shutdown_listeners: BroadcastData<()>, |
20 | pub last_worker_listeners: BroadcastData<()>, | |
7a630df7 | 21 | pub worker_count: usize, |
3da9b7e0 | 22 | pub internal_task_count: usize, |
7a630df7 DM |
23 | pub reload_request: bool, |
24 | } | |
25 | ||
7a630df7 DM |
26 | lazy_static! { |
27 | static ref SERVER_STATE: Mutex<ServerState> = Mutex::new(ServerState { | |
28 | mode: ServerMode::Normal, | |
e45afdff DM |
29 | shutdown_listeners: BroadcastData::new(), |
30 | last_worker_listeners: BroadcastData::new(), | |
7a630df7 | 31 | worker_count: 0, |
3da9b7e0 | 32 | internal_task_count: 0, |
7a630df7 DM |
33 | reload_request: false, |
34 | }); | |
35 | } | |
36 | ||
37 | pub fn server_state_init() -> Result<(), Error> { | |
38 | ||
db0cb9ce | 39 | let mut stream = signal(SignalKind::interrupt())?; |
7a630df7 | 40 | |
db0cb9ce WB |
41 | let future = async move { |
42 | while stream.recv().await.is_some() { | |
43 | println!("got shutdown request (SIGINT)"); | |
44 | SERVER_STATE.lock().unwrap().reload_request = false; | |
45 | tools::request_shutdown(); | |
46 | } | |
47 | }.boxed(); | |
7a630df7 DM |
48 | |
49 | let abort_future = last_worker_future().map_err(|_| {}); | |
aa4110cc | 50 | let task = futures::future::select(future, abort_future); |
7a630df7 | 51 | |
aa4110cc | 52 | tokio::spawn(task.map(|_| ())); |
7a630df7 | 53 | |
db0cb9ce | 54 | let mut stream = signal(SignalKind::hangup())?; |
7a630df7 | 55 | |
db0cb9ce WB |
56 | let future = async move { |
57 | while stream.recv().await.is_some() { | |
58 | println!("got reload request (SIGHUP)"); | |
59 | SERVER_STATE.lock().unwrap().reload_request = true; | |
60 | tools::request_shutdown(); | |
61 | } | |
62 | }.boxed(); | |
7a630df7 DM |
63 | |
64 | let abort_future = last_worker_future().map_err(|_| {}); | |
aa4110cc | 65 | let task = futures::future::select(future, abort_future); |
7a630df7 | 66 | |
aa4110cc | 67 | tokio::spawn(task.map(|_| ())); |
7a630df7 DM |
68 | |
69 | Ok(()) | |
70 | } | |
71 | ||
72 | pub fn is_reload_request() -> bool { | |
73 | let data = SERVER_STATE.lock().unwrap(); | |
74 | ||
62ee2eb4 | 75 | data.mode == ServerMode::Shutdown && data.reload_request |
7a630df7 DM |
76 | } |
77 | ||
78 | pub fn server_shutdown() { | |
79 | let mut data = SERVER_STATE.lock().unwrap(); | |
80 | ||
81 | println!("SET SHUTDOWN MODE"); | |
82 | ||
83 | data.mode = ServerMode::Shutdown; | |
84 | ||
e45afdff | 85 | data.shutdown_listeners.notify_listeners(Ok(())); |
7a630df7 DM |
86 | |
87 | drop(data); // unlock | |
88 | ||
89 | check_last_worker(); | |
90 | } | |
91 | ||
aa4110cc | 92 | pub fn shutdown_future() -> impl Future<Output = ()> { |
7a630df7 | 93 | let mut data = SERVER_STATE.lock().unwrap(); |
aa4110cc WB |
94 | data |
95 | .shutdown_listeners | |
96 | .listen() | |
97 | .map(|_| ()) | |
7a630df7 DM |
98 | } |
99 | ||
aa4110cc | 100 | pub fn last_worker_future() -> impl Future<Output = Result<(), Error>> { |
7a630df7 | 101 | let mut data = SERVER_STATE.lock().unwrap(); |
e45afdff | 102 | data.last_worker_listeners.listen() |
7a630df7 DM |
103 | } |
104 | ||
105 | pub fn set_worker_count(count: usize) { | |
33a88daf DC |
106 | SERVER_STATE.lock().unwrap().worker_count = count; |
107 | ||
108 | check_last_worker(); | |
109 | } | |
110 | ||
111 | pub fn check_last_worker() { | |
7a630df7 | 112 | let mut data = SERVER_STATE.lock().unwrap(); |
7a630df7 | 113 | |
3da9b7e0 | 114 | if !(data.mode == ServerMode::Shutdown && data.worker_count == 0 && data.internal_task_count == 0) { return; } |
7a630df7 | 115 | |
e45afdff | 116 | data.last_worker_listeners.notify_listeners(Ok(())); |
7a630df7 DM |
117 | } |
118 | ||
33a88daf DC |
119 | /// Spawns a tokio task that will be tracked for reload |
120 | /// and if it is finished, notify the last_worker_listener if we | |
121 | /// are in shutdown mode | |
122 | pub fn spawn_internal_task<T>(task: T) | |
123 | where | |
124 | T: Future + Send + 'static, | |
125 | T::Output: Send + 'static, | |
126 | { | |
7a630df7 | 127 | let mut data = SERVER_STATE.lock().unwrap(); |
3da9b7e0 | 128 | data.internal_task_count += 1; |
7a630df7 | 129 | |
33a88daf DC |
130 | tokio::spawn(async move { |
131 | let _ = tokio::spawn(task).await; // ignore errors | |
7a630df7 | 132 | |
33a88daf DC |
133 | { // drop mutex |
134 | let mut data = SERVER_STATE.lock().unwrap(); | |
3da9b7e0 TL |
135 | if data.internal_task_count > 0 { |
136 | data.internal_task_count -= 1; | |
33a88daf DC |
137 | } |
138 | } | |
139 | ||
140 | check_last_worker(); | |
141 | }); | |
7a630df7 | 142 | } |