]>
Commit | Line | Data |
---|---|---|
f7d4e4b5 | 1 | use anyhow::{Error}; |
7a630df7 DM |
2 | use lazy_static::lazy_static; |
3 | use std::sync::Mutex; | |
4 | ||
5 | use futures::*; | |
7a630df7 | 6 | |
db0cb9ce | 7 | use tokio::signal::unix::{signal, SignalKind}; |
7a630df7 | 8 | |
ccc3896f | 9 | use pbs_tools::broadcast_future::BroadcastData; |
7a630df7 DM |
10 | |
11 | #[derive(PartialEq, Copy, Clone, Debug)] | |
12 | pub enum ServerMode { | |
13 | Normal, | |
14 | Shutdown, | |
15 | } | |
16 | ||
17 | pub struct ServerState { | |
18 | pub mode: ServerMode, | |
e45afdff DM |
19 | pub shutdown_listeners: BroadcastData<()>, |
20 | pub last_worker_listeners: BroadcastData<()>, | |
7a630df7 | 21 | pub worker_count: usize, |
3da9b7e0 | 22 | pub internal_task_count: usize, |
7a630df7 DM |
23 | pub reload_request: bool, |
24 | } | |
25 | ||
7a630df7 DM |
26 | lazy_static! { |
27 | static ref SERVER_STATE: Mutex<ServerState> = Mutex::new(ServerState { | |
28 | mode: ServerMode::Normal, | |
e45afdff DM |
29 | shutdown_listeners: BroadcastData::new(), |
30 | last_worker_listeners: BroadcastData::new(), | |
7a630df7 | 31 | worker_count: 0, |
3da9b7e0 | 32 | internal_task_count: 0, |
7a630df7 DM |
33 | reload_request: false, |
34 | }); | |
35 | } | |
36 | ||
38da8ca1 DM |
37 | /// Listen to ``SIGINT`` and ``SIGHUP`` signals |
38 | /// | |
39 | /// * ``SIGINT``: start server shutdown | |
40 | /// | |
41 | /// * ``SIGHUP``: start server reload | |
42 | pub fn catch_shutdown_and_reload_signals() -> Result<(), Error> { | |
7a630df7 | 43 | |
db0cb9ce | 44 | let mut stream = signal(SignalKind::interrupt())?; |
7a630df7 | 45 | |
db0cb9ce WB |
46 | let future = async move { |
47 | while stream.recv().await.is_some() { | |
38da8ca1 | 48 | log::info!("got shutdown request (SIGINT)"); |
db0cb9ce | 49 | SERVER_STATE.lock().unwrap().reload_request = false; |
fd6d2438 | 50 | crate::request_shutdown(); |
db0cb9ce WB |
51 | } |
52 | }.boxed(); | |
7a630df7 DM |
53 | |
54 | let abort_future = last_worker_future().map_err(|_| {}); | |
aa4110cc | 55 | let task = futures::future::select(future, abort_future); |
7a630df7 | 56 | |
aa4110cc | 57 | tokio::spawn(task.map(|_| ())); |
7a630df7 | 58 | |
db0cb9ce | 59 | let mut stream = signal(SignalKind::hangup())?; |
7a630df7 | 60 | |
db0cb9ce WB |
61 | let future = async move { |
62 | while stream.recv().await.is_some() { | |
38da8ca1 | 63 | log::info!("got reload request (SIGHUP)"); |
db0cb9ce | 64 | SERVER_STATE.lock().unwrap().reload_request = true; |
fd6d2438 | 65 | crate::request_shutdown(); |
db0cb9ce WB |
66 | } |
67 | }.boxed(); | |
7a630df7 DM |
68 | |
69 | let abort_future = last_worker_future().map_err(|_| {}); | |
aa4110cc | 70 | let task = futures::future::select(future, abort_future); |
7a630df7 | 71 | |
aa4110cc | 72 | tokio::spawn(task.map(|_| ())); |
7a630df7 DM |
73 | |
74 | Ok(()) | |
75 | } | |
76 | ||
77 | pub fn is_reload_request() -> bool { | |
78 | let data = SERVER_STATE.lock().unwrap(); | |
79 | ||
62ee2eb4 | 80 | data.mode == ServerMode::Shutdown && data.reload_request |
7a630df7 DM |
81 | } |
82 | ||
83 | pub fn server_shutdown() { | |
84 | let mut data = SERVER_STATE.lock().unwrap(); | |
85 | ||
38da8ca1 | 86 | log::info!("request_shutdown"); |
7a630df7 DM |
87 | |
88 | data.mode = ServerMode::Shutdown; | |
89 | ||
e45afdff | 90 | data.shutdown_listeners.notify_listeners(Ok(())); |
7a630df7 DM |
91 | |
92 | drop(data); // unlock | |
93 | ||
94 | check_last_worker(); | |
95 | } | |
96 | ||
aa4110cc | 97 | pub fn shutdown_future() -> impl Future<Output = ()> { |
7a630df7 | 98 | let mut data = SERVER_STATE.lock().unwrap(); |
aa4110cc WB |
99 | data |
100 | .shutdown_listeners | |
101 | .listen() | |
102 | .map(|_| ()) | |
7a630df7 DM |
103 | } |
104 | ||
aa4110cc | 105 | pub fn last_worker_future() -> impl Future<Output = Result<(), Error>> { |
7a630df7 | 106 | let mut data = SERVER_STATE.lock().unwrap(); |
e45afdff | 107 | data.last_worker_listeners.listen() |
7a630df7 DM |
108 | } |
109 | ||
110 | pub fn set_worker_count(count: usize) { | |
33a88daf DC |
111 | SERVER_STATE.lock().unwrap().worker_count = count; |
112 | ||
113 | check_last_worker(); | |
114 | } | |
115 | ||
116 | pub fn check_last_worker() { | |
7a630df7 | 117 | let mut data = SERVER_STATE.lock().unwrap(); |
7a630df7 | 118 | |
3da9b7e0 | 119 | if !(data.mode == ServerMode::Shutdown && data.worker_count == 0 && data.internal_task_count == 0) { return; } |
7a630df7 | 120 | |
e45afdff | 121 | data.last_worker_listeners.notify_listeners(Ok(())); |
7a630df7 DM |
122 | } |
123 | ||
33a88daf DC |
124 | /// Spawns a tokio task that will be tracked for reload |
125 | /// and if it is finished, notify the last_worker_listener if we | |
126 | /// are in shutdown mode | |
127 | pub fn spawn_internal_task<T>(task: T) | |
128 | where | |
129 | T: Future + Send + 'static, | |
130 | T::Output: Send + 'static, | |
131 | { | |
7a630df7 | 132 | let mut data = SERVER_STATE.lock().unwrap(); |
3da9b7e0 | 133 | data.internal_task_count += 1; |
7a630df7 | 134 | |
33a88daf DC |
135 | tokio::spawn(async move { |
136 | let _ = tokio::spawn(task).await; // ignore errors | |
7a630df7 | 137 | |
33a88daf DC |
138 | { // drop mutex |
139 | let mut data = SERVER_STATE.lock().unwrap(); | |
3da9b7e0 TL |
140 | if data.internal_task_count > 0 { |
141 | data.internal_task_count -= 1; | |
33a88daf DC |
142 | } |
143 | } | |
144 | ||
145 | check_last_worker(); | |
146 | }); | |
7a630df7 | 147 | } |