]> git.proxmox.com Git - proxmox-backup.git/blobdiff - src/server/worker_task.rs
clippy: is_some/none/ok/err/empty
[proxmox-backup.git] / src / server / worker_task.rs
index 24cfab6243857e72e0bcd4777e4b915a7aed2327..0d884ba194f717e723c7665854e0ef89e81a64bb 100644 (file)
@@ -8,7 +8,6 @@ use std::sync::{Arc, Mutex};
 use anyhow::{bail, format_err, Error};
 use futures::*;
 use lazy_static::lazy_static;
-use nix::unistd::Pid;
 use serde_json::{json, Value};
 use serde::{Serialize, Deserialize};
 use tokio::sync::oneshot;
@@ -19,50 +18,48 @@ use proxmox::tools::fs::{create_path, open_file_locked, replace_file, CreateOpti
 
 use super::UPID;
 
+use crate::buildcfg;
+use crate::server;
 use crate::tools::logrotate::{LogRotate, LogRotateFiles};
 use crate::tools::{FileLogger, FileLogOptions};
-use crate::api2::types::Authid;
+use crate::api2::types::{Authid, TaskStateType};
 
-macro_rules! PROXMOX_BACKUP_VAR_RUN_DIR_M { () => ("/run/proxmox-backup") }
-macro_rules! PROXMOX_BACKUP_LOG_DIR_M { () => ("/var/log/proxmox-backup") }
-macro_rules! PROXMOX_BACKUP_TASK_DIR_M { () => (concat!( PROXMOX_BACKUP_LOG_DIR_M!(), "/tasks")) }
-
-pub const PROXMOX_BACKUP_VAR_RUN_DIR: &str = PROXMOX_BACKUP_VAR_RUN_DIR_M!();
-pub const PROXMOX_BACKUP_LOG_DIR: &str = PROXMOX_BACKUP_LOG_DIR_M!();
-pub const PROXMOX_BACKUP_TASK_DIR: &str = PROXMOX_BACKUP_TASK_DIR_M!();
-pub const PROXMOX_BACKUP_TASK_LOCK_FN: &str = concat!(PROXMOX_BACKUP_TASK_DIR_M!(), "/.active.lock");
-pub const PROXMOX_BACKUP_ACTIVE_TASK_FN: &str = concat!(PROXMOX_BACKUP_TASK_DIR_M!(), "/active");
-pub const PROXMOX_BACKUP_INDEX_TASK_FN: &str = concat!(PROXMOX_BACKUP_TASK_DIR_M!(), "/index");
-pub const PROXMOX_BACKUP_ARCHIVE_TASK_FN: &str = concat!(PROXMOX_BACKUP_TASK_DIR_M!(), "/archive");
+macro_rules! taskdir {
+    ($subdir:expr) => (concat!(PROXMOX_BACKUP_LOG_DIR_M!(), "/tasks", $subdir))
+}
+pub const PROXMOX_BACKUP_TASK_DIR: &str = taskdir!("/");
+pub const PROXMOX_BACKUP_TASK_LOCK_FN: &str = taskdir!("/.active.lock");
+pub const PROXMOX_BACKUP_ACTIVE_TASK_FN: &str = taskdir!("/active");
+pub const PROXMOX_BACKUP_INDEX_TASK_FN: &str = taskdir!("/index");
+pub const PROXMOX_BACKUP_ARCHIVE_TASK_FN: &str = taskdir!("/archive");
 
 lazy_static! {
     static ref WORKER_TASK_LIST: Mutex<HashMap<usize, Arc<WorkerTask>>> = Mutex::new(HashMap::new());
+}
 
-    static ref MY_PID: i32 = unsafe { libc::getpid() };
-    static ref MY_PID_PSTART: u64 = procfs::PidStat::read_from_pid(Pid::from_raw(*MY_PID))
-        .unwrap()
-        .starttime;
+/// checks if the task UPID refers to a worker from this process
+fn is_local_worker(upid: &UPID) -> bool {
+    upid.pid == server::pid() && upid.pstart == server::pstart()
 }
 
 /// Test if the task is still running
 pub async fn worker_is_active(upid: &UPID) -> Result<bool, Error> {
-    if (upid.pid == *MY_PID) && (upid.pstart == *MY_PID_PSTART) {
+    if is_local_worker(upid) {
         return Ok(WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id));
     }
 
-    if !procfs::check_process_running_pstart(upid.pid, upid.pstart).is_some() {
+    if procfs::check_process_running_pstart(upid.pid, upid.pstart).is_none() {
         return Ok(false);
     }
 
-    let socketname = format!(
-        "\0{}/proxmox-task-control-{}.sock", PROXMOX_BACKUP_VAR_RUN_DIR, upid.pid);
-
+    let sock = server::ctrl_sock_from_pid(upid.pid);
     let cmd = json!({
-        "command": "status",
-        "upid": upid.to_string(),
+        "command": "worker-task-status",
+        "args": {
+            "upid": upid.to_string(),
+        },
     });
-
-    let status = super::send_command(socketname, cmd).await?;
+    let status = super::send_command(sock, cmd).await?;
 
     if let Some(active) = status.as_bool() {
         Ok(active)
@@ -73,69 +70,48 @@ pub async fn worker_is_active(upid: &UPID) -> Result<bool, Error> {
 
 /// Test if the task is still running (fast but inaccurate implementation)
 ///
-/// If the task is spanned from a different process, we simply return if
+/// If the task is spawned from a different process, we simply return if
 /// that process is still running. This information is good enough to detect
 /// stale tasks...
 pub fn worker_is_active_local(upid: &UPID) -> bool {
-    if (upid.pid == *MY_PID) && (upid.pstart == *MY_PID_PSTART) {
+    if is_local_worker(upid) {
         WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id)
     } else {
         procfs::check_process_running_pstart(upid.pid, upid.pstart).is_some()
     }
 }
 
-pub fn create_task_control_socket() -> Result<(), Error> {
-
-    let socketname = format!(
-        "\0{}/proxmox-task-control-{}.sock", PROXMOX_BACKUP_VAR_RUN_DIR, *MY_PID);
-
-    let control_future = super::create_control_socket(socketname, |param| {
-        let param = param
-            .as_object()
-            .ok_or_else(|| format_err!("unable to parse parameters (expected json object)"))?;
-        if param.keys().count() != 2 { bail!("wrong number of parameters"); }
-
-        let command = param["command"]
-            .as_str()
-            .ok_or_else(|| format_err!("unable to parse parameters (missing command)"))?;
-
-        // we have only two commands for now
-        if !(command == "abort-task" || command == "status") {
-            bail!("got unknown command '{}'", command);
-        }
-
-        let upid_str = param["upid"]
-            .as_str()
-            .ok_or_else(|| format_err!("unable to parse parameters (missing upid)"))?;
-
-        let upid = upid_str.parse::<UPID>()?;
-
-        if !(upid.pid == *MY_PID && upid.pstart == *MY_PID_PSTART) {
+pub fn register_task_control_commands(
+    commando_sock: &mut super::CommandoSocket,
+) -> Result<(), Error> {
+    fn get_upid(args: Option<&Value>) -> Result<UPID, Error> {
+        let args = if let Some(args) = args { args } else { bail!("missing args") };
+        let upid = match args.get("upid") {
+            Some(Value::String(upid)) => upid.parse::<UPID>()?,
+            None => bail!("no upid in args"),
+            _ => bail!("unable to parse upid"),
+        };
+        if !is_local_worker(&upid) {
             bail!("upid does not belong to this process");
         }
+        Ok(upid)
+    }
 
-        let hash = WORKER_TASK_LIST.lock().unwrap();
+    commando_sock.register_command("worker-task-abort".into(), move |args| {
+        let upid = get_upid(args)?;
 
-        match command {
-            "abort-task" => {
-                if let Some(ref worker) = hash.get(&upid.task_id) {
-                    worker.request_abort();
-                } else {
-                    // assume task is already stopped
-                }
-                Ok(Value::Null)
-            }
-            "status" => {
-                let active = hash.contains_key(&upid.task_id);
-                Ok(active.into())
-            }
-            _ => {
-                bail!("got unknown command '{}'", command);
-            }
+        if let Some(ref worker) = WORKER_TASK_LIST.lock().unwrap().get(&upid.task_id) {
+            worker.request_abort();
         }
+        Ok(Value::Null)
     })?;
+    commando_sock.register_command("worker-task-status".into(), move |args| {
+        let upid = get_upid(args)?;
 
-    tokio::spawn(control_future);
+        let active = WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id);
+
+        Ok(active.into())
+    })?;
 
     Ok(())
 }
@@ -150,17 +126,14 @@ pub fn abort_worker_async(upid: UPID) {
 
 pub async fn abort_worker(upid: UPID) -> Result<(), Error> {
 
-    let target_pid = upid.pid;
-
-    let socketname = format!(
-        "\0{}/proxmox-task-control-{}.sock", PROXMOX_BACKUP_VAR_RUN_DIR, target_pid);
-
+    let sock = server::ctrl_sock_from_pid(upid.pid);
     let cmd = json!({
-        "command": "abort-task",
-        "upid": upid.to_string(),
+        "command": "worker-task-abort",
+        "args": {
+            "upid": upid.to_string(),
+        },
     });
-
-    super::send_command(socketname, cmd).map_ok(|_| ()).await
+    super::send_command(sock, cmd).map_ok(|_| ()).await
 }
 
 fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<TaskState>), Error> {
@@ -189,9 +162,9 @@ pub fn create_task_log_dirs() -> Result<(), Error> {
             .owner(backup_user.uid)
             .group(backup_user.gid);
 
-        create_path(PROXMOX_BACKUP_LOG_DIR, None, Some(opts.clone()))?;
+        create_path(buildcfg::PROXMOX_BACKUP_LOG_DIR, None, Some(opts.clone()))?;
         create_path(PROXMOX_BACKUP_TASK_DIR, None, Some(opts.clone()))?;
-        create_path(PROXMOX_BACKUP_VAR_RUN_DIR, None, Some(opts))?;
+        create_path(buildcfg::PROXMOX_BACKUP_RUN_DIR, None, Some(opts))?;
         Ok(())
     }).map_err(|err: Error| format_err!("unable to create task log dir - {}", err))?;
 
@@ -218,7 +191,7 @@ pub fn upid_read_status(upid: &UPID) -> Result<TaskState, Error> {
     file.read_to_end(&mut data)?;
 
     // task logs should end with newline, we do not want it here
-    if data.len() > 0 && data[data.len()-1] == b'\n' {
+    if !data.is_empty() && data[data.len()-1] == b'\n' {
         data.pop();
     }
 
@@ -273,6 +246,15 @@ impl TaskState {
         }
     }
 
+    pub fn tasktype(&self) -> TaskStateType {
+        match self {
+            TaskState::OK { .. } => TaskStateType::OK,
+            TaskState::Unknown { .. } => TaskStateType::Unknown,
+            TaskState::Error { .. } => TaskStateType::Error,
+            TaskState::Warning { .. } => TaskStateType::Warning,
+        }
+    }
+
     fn result_text(&self) -> String {
         match self {
             TaskState::Error { message, .. } => format!("TASK ERROR: {}", message),
@@ -285,11 +267,11 @@ impl TaskState {
             Ok(TaskState::Unknown { endtime })
         } else if s == "OK" {
             Ok(TaskState::OK { endtime })
-        } else if s.starts_with("WARNINGS: ") {
-            let count: u64 = s[10..].parse()?;
+        } else if let Some(warnings) = s.strip_prefix("WARNINGS: ") {
+            let count: u64 = warnings.parse()?;
             Ok(TaskState::Warning{ count, endtime })
-        } else if s.len() > 0 {
-            let message = if s.starts_with("ERROR: ") { &s[7..] } else { s }.to_string();
+        } else if !s.is_empty() {
+            let message = if let Some(err) = s.strip_prefix("ERROR: ") { err } else { s }.to_string();
             Ok(TaskState::Error{ message, endtime })
         } else {
             bail!("unable to parse Task Status '{}'", s);
@@ -365,6 +347,9 @@ fn update_active_workers(new_upid: Option<&UPID>) -> Result<(), Error> {
     let mut finish_list: Vec<TaskListInfo> = read_task_file_from_path(PROXMOX_BACKUP_INDEX_TASK_FN)?;
     let had_index_file = !finish_list.is_empty();
 
+    // We use filter_map because one negative case wants to *move* the data into `finish_list`,
+    // clippy doesn't quite catch this!
+    #[allow(clippy::unnecessary_filter_map)]
     let mut active_list: Vec<TaskListInfo> = read_task_file_from_path(PROXMOX_BACKUP_ACTIVE_TASK_FN)?
         .into_iter()
         .filter_map(|info| {
@@ -375,10 +360,9 @@ fn update_active_workers(new_upid: Option<&UPID>) -> Result<(), Error> {
             }
 
             if !worker_is_active_local(&info.upid) {
-                println!("Detected stopped task '{}'", &info.upid_str);
+                // println!("Detected stopped task '{}'", &info.upid_str);
                 let now = proxmox::tools::time::epoch_i64();
-                let status = upid_read_status(&info.upid)
-                    .unwrap_or_else(|_| TaskState::Unknown { endtime: now });
+                let status = upid_read_status(&info.upid).unwrap_or(TaskState::Unknown { endtime: now });
                 finish_list.push(TaskListInfo {
                     upid: info.upid,
                     upid_str: info.upid_str,
@@ -602,18 +586,9 @@ struct WorkerTaskData {
     pub abort_listeners: Vec<oneshot::Sender<()>>,
 }
 
-impl Drop for WorkerTask {
-
-    fn drop(&mut self) {
-        println!("unregister worker");
-    }
-}
-
 impl WorkerTask {
 
     pub fn new(worker_type: &str, worker_id: Option<String>, auth_id: Authid, to_stdout: bool) -> Result<Arc<Self>, Error> {
-        println!("register worker");
-
         let upid = UPID::new(worker_type, worker_id, auth_id)?;
         let task_id = upid.task_id;
 
@@ -628,7 +603,7 @@ impl WorkerTask {
         path.push(upid.to_string());
 
         let logger_options = FileLogOptions {
-            to_stdout: to_stdout,
+            to_stdout,
             exclusive: true,
             prefix_time: true,
             read: true,
@@ -692,8 +667,6 @@ impl WorkerTask {
     ) -> Result<String, Error>
         where F: Send + UnwindSafe + 'static + FnOnce(Arc<WorkerTask>) -> Result<(), Error>
     {
-        println!("register worker thread");
-
         let worker = WorkerTask::new(worker_type, worker_id, auth_id, to_stdout)?;
         let upid_str = worker.upid.to_string();