]> git.proxmox.com Git - proxmox-backup.git/commitdiff
implement server state/signal handling, depend on tokio-signal
authorDietmar Maurer <dietmar@proxmox.com>
Mon, 8 Apr 2019 10:21:29 +0000 (12:21 +0200)
committerDietmar Maurer <dietmar@proxmox.com>
Mon, 8 Apr 2019 11:59:07 +0000 (13:59 +0200)
Cargo.toml
src/server.rs
src/server/state.rs [new file with mode: 0644]
src/server/worker_task.rs
src/tools.rs

index 5cebf940a4cf9a87e8eb0b2147a440c3e2f6b4e5..dc2eec576494171266167ee12d71101debe80496 100644 (file)
@@ -21,8 +21,9 @@ futures = "0.1"
 tokio-threadpool = "0.1"
 tokio = "0.1"
 tokio-fs = "0.1"
-tokio-tls = "0.2.1"
-native-tls = "0.2.2"
+tokio-tls = "0.2"
+tokio-signal = "0.2"
+native-tls = "0.2"
 http = "0.1"
 hyper = "0.12"
 hyper-tls = "0.3"
index 1c91042b6bf4dbf76f6a66e9294dc8e5c21f068e..0786ba38e2d47a144a1788016de8ed07b2de99f4 100644 (file)
@@ -7,9 +7,14 @@
 mod environment;
 pub use environment::*;
 
+mod state;
+pub use state::*;
+
 mod worker_task;
 pub use worker_task::*;
+
 pub mod formatter;
+
 #[macro_use]
 pub mod rest;
 
diff --git a/src/server/state.rs b/src/server/state.rs
new file mode 100644 (file)
index 0000000..f7ee430
--- /dev/null
@@ -0,0 +1,151 @@
+use failure::*;
+use lazy_static::lazy_static;
+use std::sync::Mutex;
+
+use futures::*;
+use futures::stream::Stream;
+
+use tokio::sync::oneshot;
+use tokio_signal::unix::{Signal, SIGHUP, SIGINT};
+
+use crate::tools;
+
+#[derive(PartialEq, Copy, Clone, Debug)]
+pub enum ServerMode {
+    Normal,
+    Shutdown,
+}
+
+pub struct ServerState {
+    pub mode: ServerMode,
+    pub shutdown_listeners: Vec<oneshot::Sender<()>>,
+    pub last_worker_listeners: Vec<oneshot::Sender<()>>,
+    pub worker_count: usize,
+    pub reload_request: bool,
+}
+
+
+lazy_static! {
+    static ref SERVER_STATE: Mutex<ServerState> = Mutex::new(ServerState {
+        mode: ServerMode::Normal,
+        shutdown_listeners: vec![],
+        last_worker_listeners: vec![],
+        worker_count: 0,
+        reload_request: false,
+    });
+}
+
+pub fn server_state_init() -> Result<(), Error> {
+
+    let stream = Signal::new(SIGINT).flatten_stream();
+
+    let future = stream.for_each(|_| {
+        println!("got shutdown request (SIGINT)");
+        SERVER_STATE.lock().unwrap().reload_request = false;
+        tools::request_shutdown();
+        Ok(())
+    }).map_err(|_| {});
+
+    let abort_future = last_worker_future().map_err(|_| {});
+    let task = future.select(abort_future);
+
+    tokio::spawn(task.map(|_| {}).map_err(|_| {}));
+
+    let stream = Signal::new(SIGHUP).flatten_stream();
+
+    let future = stream.for_each(|_| {
+        println!("got reload request (SIGHUP)");
+        SERVER_STATE.lock().unwrap().reload_request = true;
+        tools::request_shutdown();
+        Ok(())
+    }).map_err(|_| {});
+
+    let abort_future = last_worker_future().map_err(|_| {});
+    let task = future.select(abort_future);
+
+    tokio::spawn(task.map(|_| {}).map_err(|_| {}));
+
+    Ok(())
+}
+
+pub fn is_reload_request() -> bool {
+    let data = SERVER_STATE.lock().unwrap();
+
+    if data.mode == ServerMode::Shutdown && data.reload_request {
+        true
+    } else {
+        false
+    }
+}
+
+pub fn server_shutdown() {
+    let mut data = SERVER_STATE.lock().unwrap();
+
+    println!("SET SHUTDOWN MODE");
+
+    data.mode = ServerMode::Shutdown;
+
+    notify_listeners(&mut data.shutdown_listeners);
+
+    drop(data); // unlock
+
+    check_last_worker();
+}
+
+pub fn shutdown_future() -> oneshot::Receiver<()> {
+    let (tx, rx) = oneshot::channel::<()>();
+
+    let mut data = SERVER_STATE.lock().unwrap();
+    match data.mode {
+        ServerMode::Normal => { data.shutdown_listeners.push(tx); },
+        ServerMode::Shutdown =>  { let _ = tx.send(()); },
+    }
+
+    rx
+}
+
+pub fn last_worker_future() -> oneshot::Receiver<()> {
+    let (tx, rx) = oneshot::channel::<()>();
+
+    let mut data = SERVER_STATE.lock().unwrap();
+    if data.mode == ServerMode::Shutdown && data.worker_count == 0 {
+        let _ = tx.send(());
+    } else {
+        data.last_worker_listeners.push(tx);
+    }
+
+    rx
+}
+
+pub fn set_worker_count(count: usize) {
+    let mut data = SERVER_STATE.lock().unwrap();
+    data.worker_count = count;
+
+    if !(data.mode == ServerMode::Shutdown && data.worker_count == 0) { return; }
+
+    notify_listeners(&mut data.last_worker_listeners);
+}
+
+
+pub fn check_last_worker() {
+
+    let mut data = SERVER_STATE.lock().unwrap();
+
+    if !(data.mode == ServerMode::Shutdown && data.worker_count == 0) { return; }
+
+    notify_listeners(&mut data.last_worker_listeners);
+}
+
+fn notify_listeners(list: &mut Vec<oneshot::Sender<()>>) {
+    loop {
+        match list.pop() {
+            None => { break; },
+            Some(ch) => {
+                println!("SEND ABORT");
+                if let Err(_) = ch.send(()) {
+                    eprintln!("SEND ABORT failed");
+                }
+            },
+        }
+    }
+}
index 2b9ae069386d2ead59429d2dc37177340a463fb5..49e5a140d62dc7f8542ddce06657c8d4a58cf09e 100644 (file)
@@ -410,7 +410,10 @@ impl WorkerTask {
             }),
         });
 
-        WORKER_TASK_LIST.lock().unwrap().insert(task_id, worker.clone());
+        let mut hash = WORKER_TASK_LIST.lock().unwrap();
+
+        hash.insert(task_id, worker.clone());
+        super::set_worker_count(hash.len());
 
         Ok(worker)
     }
@@ -434,6 +437,7 @@ impl WorkerTask {
             WORKER_TASK_LIST.lock().unwrap().remove(&task_id);
             worker.log_result(result);
             let _ = update_active_workers(None);
+            super::set_worker_count(WORKER_TASK_LIST.lock().unwrap().len());
             Ok(())
         }));
 
@@ -464,6 +468,7 @@ impl WorkerTask {
             worker.log_result(result);
             let _ = update_active_workers(None);
             p.send(()).unwrap();
+            super::set_worker_count(WORKER_TASK_LIST.lock().unwrap().len());
         });
 
         tokio::spawn(c.then(|_| Ok(())));
index bbf8f6473d591566091b9176dfc4ae3acd49ee6f..fda9829d8e4ad4b4fd1cc4a76771bb54ffeebd94 100644 (file)
@@ -627,6 +627,7 @@ static mut SHUTDOWN_REQUESTED: bool = false;
 
 pub fn request_shutdown() {
     unsafe { SHUTDOWN_REQUESTED = true; }
+    crate::server::server_shutdown();
 }
 
 #[inline(always)]