]> git.proxmox.com Git - proxmox-backup.git/commitdiff
server: use generalized commando socket for worker tasks commands
authorThomas Lamprecht <t.lamprecht@proxmox.com>
Mon, 2 Nov 2020 18:13:36 +0000 (19:13 +0100)
committerThomas Lamprecht <t.lamprecht@proxmox.com>
Mon, 2 Nov 2020 18:48:04 +0000 (19:48 +0100)
Allows to extend the use of that socket in the future, e.g., for log
rotate re-open signaling.

To reflect this we use a more general name, and change the commandos
to a more clear namespace.

Both are actually somewhat a breaking change, but the single real
world issue it should be able to cause is, that one won't be able to
stop task from older daemons, which still use the older abstract
socket name format.

Signed-off-by: Thomas Lamprecht <t.lamprecht@proxmox.com>
src/bin/proxmox-backup-api.rs
src/bin/proxmox-backup-proxy.rs
src/server.rs
src/server/worker_task.rs
tests/worker-task-abort.rs

index c1dee2b8a7c731122fc17946d0602d1d92949646..934562574e4926757486e225774d1947220a9369 100644 (file)
@@ -52,6 +52,8 @@ async fn run() -> Result<(), Error> {
     let mut config = server::ApiConfig::new(
         buildcfg::JS_DIR, &proxmox_backup::api2::ROUTER, RpcEnvironmentType::PRIVILEGED)?;
 
+    let mut commando_sock = server::CommandoSocket::new(server::our_ctrl_sock());
+
     config.enable_file_log(buildcfg::API_ACCESS_LOG_FN)?;
 
     let rest_server = RestServer::new(config);
@@ -79,7 +81,8 @@ async fn run() -> Result<(), Error> {
     daemon::systemd_notify(daemon::SystemdNotify::Ready)?;
 
     let init_result: Result<(), Error> = try_block!({
-        server::create_task_control_socket()?;
+        server::register_task_control_commands(&mut commando_sock)?;
+        commando_sock.spawn()?;
         server::server_state_init()?;
         Ok(())
     });
index 39254504010d195ff46e9729917ff65082647199..ce67faeea8354e8bf822ae8b87a9495fa1f8fb8d 100644 (file)
@@ -94,6 +94,8 @@ async fn run() -> Result<(), Error> {
     config.register_template("index", &indexpath)?;
     config.register_template("console", "/usr/share/pve-xtermjs/index.html.hbs")?;
 
+    let mut commando_sock = server::CommandoSocket::new(server::our_ctrl_sock());
+
     config.enable_file_log(buildcfg::API_ACCESS_LOG_FN)?;
 
     let rest_server = RestServer::new(config);
@@ -146,7 +148,8 @@ async fn run() -> Result<(), Error> {
     daemon::systemd_notify(daemon::SystemdNotify::Ready)?;
 
     let init_result: Result<(), Error> = try_block!({
-        server::create_task_control_socket()?;
+        server::register_task_control_commands(&mut commando_sock)?;
+        commando_sock.spawn()?;
         server::server_state_init()?;
         Ok(())
     });
index f0db500e2c0d3e4c181e919adc4744357d6baea1..aa4b57ec6417fadbeef82732723075b9d96986e8 100644 (file)
@@ -4,6 +4,34 @@
 //! services. We want async IO, so this is built on top of
 //! tokio/hyper.
 
+use lazy_static::lazy_static;
+use nix::unistd::Pid;
+
+use proxmox::sys::linux::procfs::PidStat;
+
+use crate::buildcfg;
+
+lazy_static! {
+    static ref PID: i32 = unsafe { libc::getpid() };
+    static ref PSTART: u64 = PidStat::read_from_pid(Pid::from_raw(*PID)).unwrap().starttime;
+}
+
+pub fn pid() -> i32 {
+    *PID
+}
+
+pub fn pstart() -> u64 {
+    *PSTART
+}
+
+pub fn ctrl_sock_from_pid(pid: i32) -> String {
+    format!("\0{}/control-{}.sock", buildcfg::PROXMOX_BACKUP_RUN_DIR, pid)
+}
+
+pub fn our_ctrl_sock() -> String {
+    ctrl_sock_from_pid(*PID)
+}
+
 mod environment;
 pub use environment::*;
 
index db0d7d2b17cff6a39542cef43911aac962f866c5..ff8530584d015c3c73f8b88fb262899dc1e833b9 100644 (file)
@@ -8,7 +8,6 @@ use std::sync::{Arc, Mutex};
 use anyhow::{bail, format_err, Error};
 use futures::*;
 use lazy_static::lazy_static;
-use nix::unistd::Pid;
 use serde_json::{json, Value};
 use serde::{Serialize, Deserialize};
 use tokio::sync::oneshot;
@@ -20,6 +19,7 @@ use proxmox::tools::fs::{create_path, open_file_locked, replace_file, CreateOpti
 use super::UPID;
 
 use crate::buildcfg;
+use crate::server;
 use crate::tools::logrotate::{LogRotate, LogRotateFiles};
 use crate::tools::{FileLogger, FileLogOptions};
 use crate::api2::types::Authid;
@@ -35,16 +35,16 @@ pub const PROXMOX_BACKUP_ARCHIVE_TASK_FN: &str = taskdir!("/archive");
 
 lazy_static! {
     static ref WORKER_TASK_LIST: Mutex<HashMap<usize, Arc<WorkerTask>>> = Mutex::new(HashMap::new());
+}
 
-    static ref MY_PID: i32 = unsafe { libc::getpid() };
-    static ref MY_PID_PSTART: u64 = procfs::PidStat::read_from_pid(Pid::from_raw(*MY_PID))
-        .unwrap()
-        .starttime;
+/// checks if the task UPID refers to a worker from this process
+fn is_local_worker(upid: &UPID) -> bool {
+    upid.pid == server::pid() && upid.pstart == server::pstart()
 }
 
 /// Test if the task is still running
 pub async fn worker_is_active(upid: &UPID) -> Result<bool, Error> {
-    if (upid.pid == *MY_PID) && (upid.pstart == *MY_PID_PSTART) {
+    if is_local_worker(upid) {
         return Ok(WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id));
     }
 
@@ -52,15 +52,12 @@ pub async fn worker_is_active(upid: &UPID) -> Result<bool, Error> {
         return Ok(false);
     }
 
-    let socketname = format!(
-        "\0{}/proxmox-task-control-{}.sock", buildcfg::PROXMOX_BACKUP_RUN_DIR, upid.pid);
-
+    let sock = server::ctrl_sock_from_pid(upid.pid);
     let cmd = json!({
-        "command": "status",
+        "command": "worker-task-status",
         "upid": upid.to_string(),
     });
-
-    let status = super::send_command(socketname, cmd).await?;
+    let status = super::send_command(sock, cmd).await?;
 
     if let Some(active) = status.as_bool() {
         Ok(active)
@@ -71,69 +68,48 @@ pub async fn worker_is_active(upid: &UPID) -> Result<bool, Error> {
 
 /// Test if the task is still running (fast but inaccurate implementation)
 ///
-/// If the task is spanned from a different process, we simply return if
+/// If the task is spawned from a different process, we simply return if
 /// that process is still running. This information is good enough to detect
 /// stale tasks...
 pub fn worker_is_active_local(upid: &UPID) -> bool {
-    if (upid.pid == *MY_PID) && (upid.pstart == *MY_PID_PSTART) {
+    if is_local_worker(upid) {
         WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id)
     } else {
         procfs::check_process_running_pstart(upid.pid, upid.pstart).is_some()
     }
 }
 
-pub fn create_task_control_socket() -> Result<(), Error> {
-
-    let socketname = format!(
-        "\0{}/proxmox-task-control-{}.sock", buildcfg::PROXMOX_BACKUP_RUN_DIR, *MY_PID);
-
-    let control_future = super::create_control_socket(socketname, |param| {
-        let param = param
-            .as_object()
-            .ok_or_else(|| format_err!("unable to parse parameters (expected json object)"))?;
-        if param.keys().count() != 2 { bail!("wrong number of parameters"); }
-
-        let command = param["command"]
-            .as_str()
-            .ok_or_else(|| format_err!("unable to parse parameters (missing command)"))?;
-
-        // we have only two commands for now
-        if !(command == "abort-task" || command == "status") {
-            bail!("got unknown command '{}'", command);
-        }
-
-        let upid_str = param["upid"]
-            .as_str()
-            .ok_or_else(|| format_err!("unable to parse parameters (missing upid)"))?;
-
-        let upid = upid_str.parse::<UPID>()?;
-
-        if !(upid.pid == *MY_PID && upid.pstart == *MY_PID_PSTART) {
+pub fn register_task_control_commands(
+    commando_sock: &mut super::CommandoSocket,
+) -> Result<(), Error> {
+    fn get_upid(args: Option<&Value>) -> Result<UPID, Error> {
+        let args = if let Some(args) = args { args } else { bail!("missing args") };
+        let upid = match args.get("upid") {
+            Some(Value::String(upid)) => upid.parse::<UPID>()?,
+            None => bail!("no upid in args"),
+            _ => bail!("unable to parse upid"),
+        };
+        if !is_local_worker(&upid) {
             bail!("upid does not belong to this process");
         }
+        Ok(upid)
+    }
 
-        let hash = WORKER_TASK_LIST.lock().unwrap();
+    commando_sock.register_command("worker-task-abort".into(), move |args| {
+        let upid = get_upid(args)?;
 
-        match command {
-            "abort-task" => {
-                if let Some(ref worker) = hash.get(&upid.task_id) {
-                    worker.request_abort();
-                } else {
-                    // assume task is already stopped
-                }
-                Ok(Value::Null)
-            }
-            "status" => {
-                let active = hash.contains_key(&upid.task_id);
-                Ok(active.into())
-            }
-            _ => {
-                bail!("got unknown command '{}'", command);
-            }
+        if let Some(ref worker) = WORKER_TASK_LIST.lock().unwrap().get(&upid.task_id) {
+            worker.request_abort();
         }
+        Ok(Value::Null)
     })?;
+    commando_sock.register_command("worker-task-status".into(), move |args| {
+        let upid = get_upid(args)?;
 
-    tokio::spawn(control_future);
+        let active = WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id);
+
+        Ok(active.into())
+    })?;
 
     Ok(())
 }
@@ -148,17 +124,12 @@ pub fn abort_worker_async(upid: UPID) {
 
 pub async fn abort_worker(upid: UPID) -> Result<(), Error> {
 
-    let target_pid = upid.pid;
-
-    let socketname = format!(
-        "\0{}/proxmox-task-control-{}.sock", buildcfg::PROXMOX_BACKUP_RUN_DIR, target_pid);
-
+    let sock = server::ctrl_sock_from_pid(upid.pid);
     let cmd = json!({
-        "command": "abort-task",
+        "command": "worker-task-abort",
         "upid": upid.to_string(),
     });
-
-    super::send_command(socketname, cmd).map_ok(|_| ()).await
+    super::send_command(sock, cmd).map_ok(|_| ()).await
 }
 
 fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<TaskState>), Error> {
index 3cb41e32068594aeebab49344db02a09762c71f2..5b73c8b974ae79e6267959b68ecb470b63b6d3c2 100644 (file)
@@ -42,8 +42,10 @@ fn worker_task_abort() -> Result<(), Error> {
     let mut rt = tokio::runtime::Runtime::new().unwrap();
     rt.block_on(async move {
 
+        let mut commando_sock = server::CommandoSocket::new(server::our_ctrl_sock());
+
         let init_result: Result<(), Error> = try_block!({
-            server::create_task_control_socket()?;
+            server::register_task_control_commands(&mut commando_sock)?;
             server::server_state_init()?;
             Ok(())
         });