]> git.proxmox.com Git - proxmox-backup.git/blobdiff - src/server/worker_task.rs
clippy: is_some/none/ok/err/empty
[proxmox-backup.git] / src / server / worker_task.rs
index 8ef0fde755055af5d1bcdc6d5d472625b6fe6b0b..0d884ba194f717e723c7665854e0ef89e81a64bb 100644 (file)
@@ -8,7 +8,6 @@ use std::sync::{Arc, Mutex};
 use anyhow::{bail, format_err, Error};
 use futures::*;
 use lazy_static::lazy_static;
-use nix::unistd::Pid;
 use serde_json::{json, Value};
 use serde::{Serialize, Deserialize};
 use tokio::sync::oneshot;
@@ -19,52 +18,48 @@ use proxmox::tools::fs::{create_path, open_file_locked, replace_file, CreateOpti
 
 use super::UPID;
 
+use crate::buildcfg;
+use crate::server;
 use crate::tools::logrotate::{LogRotate, LogRotateFiles};
 use crate::tools::{FileLogger, FileLogOptions};
-use crate::api2::types::Userid;
+use crate::api2::types::{Authid, TaskStateType};
 
-macro_rules! PROXMOX_BACKUP_VAR_RUN_DIR_M { () => ("/run/proxmox-backup") }
-macro_rules! PROXMOX_BACKUP_LOG_DIR_M { () => ("/var/log/proxmox-backup") }
-macro_rules! PROXMOX_BACKUP_TASK_DIR_M { () => (concat!( PROXMOX_BACKUP_LOG_DIR_M!(), "/tasks")) }
-
-pub const PROXMOX_BACKUP_VAR_RUN_DIR: &str = PROXMOX_BACKUP_VAR_RUN_DIR_M!();
-pub const PROXMOX_BACKUP_LOG_DIR: &str = PROXMOX_BACKUP_LOG_DIR_M!();
-pub const PROXMOX_BACKUP_TASK_DIR: &str = PROXMOX_BACKUP_TASK_DIR_M!();
-pub const PROXMOX_BACKUP_TASK_LOCK_FN: &str = concat!(PROXMOX_BACKUP_TASK_DIR_M!(), "/.active.lock");
-pub const PROXMOX_BACKUP_ACTIVE_TASK_FN: &str = concat!(PROXMOX_BACKUP_TASK_DIR_M!(), "/active");
-pub const PROXMOX_BACKUP_INDEX_TASK_FN: &str = concat!(PROXMOX_BACKUP_TASK_DIR_M!(), "/index");
-pub const PROXMOX_BACKUP_ARCHIVE_TASK_FN: &str = concat!(PROXMOX_BACKUP_TASK_DIR_M!(), "/archive");
-
-const MAX_INDEX_TASKS: usize = 1000;
+macro_rules! taskdir {
+    ($subdir:expr) => (concat!(PROXMOX_BACKUP_LOG_DIR_M!(), "/tasks", $subdir))
+}
+pub const PROXMOX_BACKUP_TASK_DIR: &str = taskdir!("/");
+pub const PROXMOX_BACKUP_TASK_LOCK_FN: &str = taskdir!("/.active.lock");
+pub const PROXMOX_BACKUP_ACTIVE_TASK_FN: &str = taskdir!("/active");
+pub const PROXMOX_BACKUP_INDEX_TASK_FN: &str = taskdir!("/index");
+pub const PROXMOX_BACKUP_ARCHIVE_TASK_FN: &str = taskdir!("/archive");
 
 lazy_static! {
     static ref WORKER_TASK_LIST: Mutex<HashMap<usize, Arc<WorkerTask>>> = Mutex::new(HashMap::new());
+}
 
-    static ref MY_PID: i32 = unsafe { libc::getpid() };
-    static ref MY_PID_PSTART: u64 = procfs::PidStat::read_from_pid(Pid::from_raw(*MY_PID))
-        .unwrap()
-        .starttime;
+/// checks if the task UPID refers to a worker from this process
+fn is_local_worker(upid: &UPID) -> bool {
+    upid.pid == server::pid() && upid.pstart == server::pstart()
 }
 
 /// Test if the task is still running
 pub async fn worker_is_active(upid: &UPID) -> Result<bool, Error> {
-    if (upid.pid == *MY_PID) && (upid.pstart == *MY_PID_PSTART) {
+    if is_local_worker(upid) {
         return Ok(WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id));
     }
 
-    if !procfs::check_process_running_pstart(upid.pid, upid.pstart).is_some() {
+    if procfs::check_process_running_pstart(upid.pid, upid.pstart).is_none() {
         return Ok(false);
     }
 
-    let socketname = format!(
-        "\0{}/proxmox-task-control-{}.sock", PROXMOX_BACKUP_VAR_RUN_DIR, upid.pid);
-
+    let sock = server::ctrl_sock_from_pid(upid.pid);
     let cmd = json!({
-        "command": "status",
-        "upid": upid.to_string(),
+        "command": "worker-task-status",
+        "args": {
+            "upid": upid.to_string(),
+        },
     });
-
-    let status = super::send_command(socketname, cmd).await?;
+    let status = super::send_command(sock, cmd).await?;
 
     if let Some(active) = status.as_bool() {
         Ok(active)
@@ -75,69 +70,48 @@ pub async fn worker_is_active(upid: &UPID) -> Result<bool, Error> {
 
 /// Test if the task is still running (fast but inaccurate implementation)
 ///
-/// If the task is spanned from a different process, we simply return if
+/// If the task is spawned from a different process, we simply return if
 /// that process is still running. This information is good enough to detect
 /// stale tasks...
 pub fn worker_is_active_local(upid: &UPID) -> bool {
-    if (upid.pid == *MY_PID) && (upid.pstart == *MY_PID_PSTART) {
+    if is_local_worker(upid) {
         WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id)
     } else {
         procfs::check_process_running_pstart(upid.pid, upid.pstart).is_some()
     }
 }
 
-pub fn create_task_control_socket() -> Result<(), Error> {
-
-    let socketname = format!(
-        "\0{}/proxmox-task-control-{}.sock", PROXMOX_BACKUP_VAR_RUN_DIR, *MY_PID);
-
-    let control_future = super::create_control_socket(socketname, |param| {
-        let param = param
-            .as_object()
-            .ok_or_else(|| format_err!("unable to parse parameters (expected json object)"))?;
-        if param.keys().count() != 2 { bail!("wrong number of parameters"); }
-
-        let command = param["command"]
-            .as_str()
-            .ok_or_else(|| format_err!("unable to parse parameters (missing command)"))?;
-
-        // we have only two commands for now
-        if !(command == "abort-task" || command == "status") {
-            bail!("got unknown command '{}'", command);
-        }
-
-        let upid_str = param["upid"]
-            .as_str()
-            .ok_or_else(|| format_err!("unable to parse parameters (missing upid)"))?;
-
-        let upid = upid_str.parse::<UPID>()?;
-
-        if !(upid.pid == *MY_PID && upid.pstart == *MY_PID_PSTART) {
+pub fn register_task_control_commands(
+    commando_sock: &mut super::CommandoSocket,
+) -> Result<(), Error> {
+    fn get_upid(args: Option<&Value>) -> Result<UPID, Error> {
+        let args = if let Some(args) = args { args } else { bail!("missing args") };
+        let upid = match args.get("upid") {
+            Some(Value::String(upid)) => upid.parse::<UPID>()?,
+            None => bail!("no upid in args"),
+            _ => bail!("unable to parse upid"),
+        };
+        if !is_local_worker(&upid) {
             bail!("upid does not belong to this process");
         }
+        Ok(upid)
+    }
 
-        let hash = WORKER_TASK_LIST.lock().unwrap();
+    commando_sock.register_command("worker-task-abort".into(), move |args| {
+        let upid = get_upid(args)?;
 
-        match command {
-            "abort-task" => {
-                if let Some(ref worker) = hash.get(&upid.task_id) {
-                    worker.request_abort();
-                } else {
-                    // assume task is already stopped
-                }
-                Ok(Value::Null)
-            }
-            "status" => {
-                let active = hash.contains_key(&upid.task_id);
-                Ok(active.into())
-            }
-            _ => {
-                bail!("got unknown command '{}'", command);
-            }
+        if let Some(ref worker) = WORKER_TASK_LIST.lock().unwrap().get(&upid.task_id) {
+            worker.request_abort();
         }
+        Ok(Value::Null)
     })?;
+    commando_sock.register_command("worker-task-status".into(), move |args| {
+        let upid = get_upid(args)?;
+
+        let active = WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id);
 
-    tokio::spawn(control_future);
+        Ok(active.into())
+    })?;
 
     Ok(())
 }
@@ -152,17 +126,14 @@ pub fn abort_worker_async(upid: UPID) {
 
 pub async fn abort_worker(upid: UPID) -> Result<(), Error> {
 
-    let target_pid = upid.pid;
-
-    let socketname = format!(
-        "\0{}/proxmox-task-control-{}.sock", PROXMOX_BACKUP_VAR_RUN_DIR, target_pid);
-
+    let sock = server::ctrl_sock_from_pid(upid.pid);
     let cmd = json!({
-        "command": "abort-task",
-        "upid": upid.to_string(),
+        "command": "worker-task-abort",
+        "args": {
+            "upid": upid.to_string(),
+        },
     });
-
-    super::send_command(socketname, cmd).map_ok(|_| ()).await
+    super::send_command(sock, cmd).map_ok(|_| ()).await
 }
 
 fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<TaskState>), Error> {
@@ -191,9 +162,9 @@ pub fn create_task_log_dirs() -> Result<(), Error> {
             .owner(backup_user.uid)
             .group(backup_user.gid);
 
-        create_path(PROXMOX_BACKUP_LOG_DIR, None, Some(opts.clone()))?;
+        create_path(buildcfg::PROXMOX_BACKUP_LOG_DIR, None, Some(opts.clone()))?;
         create_path(PROXMOX_BACKUP_TASK_DIR, None, Some(opts.clone()))?;
-        create_path(PROXMOX_BACKUP_VAR_RUN_DIR, None, Some(opts))?;
+        create_path(buildcfg::PROXMOX_BACKUP_RUN_DIR, None, Some(opts))?;
         Ok(())
     }).map_err(|err: Error| format_err!("unable to create task log dir - {}", err))?;
 
@@ -220,7 +191,7 @@ pub fn upid_read_status(upid: &UPID) -> Result<TaskState, Error> {
     file.read_to_end(&mut data)?;
 
     // task logs should end with newline, we do not want it here
-    if data.len() > 0 && data[data.len()-1] == b'\n' {
+    if !data.is_empty() && data[data.len()-1] == b'\n' {
         data.pop();
     }
 
@@ -275,6 +246,15 @@ impl TaskState {
         }
     }
 
+    pub fn tasktype(&self) -> TaskStateType {
+        match self {
+            TaskState::OK { .. } => TaskStateType::OK,
+            TaskState::Unknown { .. } => TaskStateType::Unknown,
+            TaskState::Error { .. } => TaskStateType::Error,
+            TaskState::Warning { .. } => TaskStateType::Warning,
+        }
+    }
+
     fn result_text(&self) -> String {
         match self {
             TaskState::Error { message, .. } => format!("TASK ERROR: {}", message),
@@ -287,11 +267,11 @@ impl TaskState {
             Ok(TaskState::Unknown { endtime })
         } else if s == "OK" {
             Ok(TaskState::OK { endtime })
-        } else if s.starts_with("WARNINGS: ") {
-            let count: u64 = s[10..].parse()?;
+        } else if let Some(warnings) = s.strip_prefix("WARNINGS: ") {
+            let count: u64 = warnings.parse()?;
             Ok(TaskState::Warning{ count, endtime })
-        } else if s.len() > 0 {
-            let message = if s.starts_with("ERROR: ") { &s[7..] } else { s }.to_string();
+        } else if !s.is_empty() {
+            let message = if let Some(err) = s.strip_prefix("ERROR: ") { err } else { s }.to_string();
             Ok(TaskState::Error{ message, endtime })
         } else {
             bail!("unable to parse Task Status '{}'", s);
@@ -363,7 +343,13 @@ fn update_active_workers(new_upid: Option<&UPID>) -> Result<(), Error> {
 
     let lock = lock_task_list_files(true)?;
 
+    // TODO remove with 1.x
     let mut finish_list: Vec<TaskListInfo> = read_task_file_from_path(PROXMOX_BACKUP_INDEX_TASK_FN)?;
+    let had_index_file = !finish_list.is_empty();
+
+    // We use filter_map because one negative case wants to *move* the data into `finish_list`,
+    // clippy doesn't quite catch this!
+    #[allow(clippy::unnecessary_filter_map)]
     let mut active_list: Vec<TaskListInfo> = read_task_file_from_path(PROXMOX_BACKUP_ACTIVE_TASK_FN)?
         .into_iter()
         .filter_map(|info| {
@@ -374,10 +360,9 @@ fn update_active_workers(new_upid: Option<&UPID>) -> Result<(), Error> {
             }
 
             if !worker_is_active_local(&info.upid) {
-                println!("Detected stopped UPID {}", &info.upid_str);
+                // println!("Detected stopped task '{}'", &info.upid_str);
                 let now = proxmox::tools::time::epoch_i64();
-                let status = upid_read_status(&info.upid)
-                    .unwrap_or_else(|_| TaskState::Unknown { endtime: now });
+                let status = upid_read_status(&info.upid).unwrap_or(TaskState::Unknown { endtime: now });
                 finish_list.push(TaskListInfo {
                     upid: info.upid,
                     upid_str: info.upid_str,
@@ -412,33 +397,10 @@ fn update_active_workers(new_upid: Option<&UPID>) -> Result<(), Error> {
         }
     });
 
-
-    let start = if finish_list.len() > MAX_INDEX_TASKS {
-        finish_list.len() - MAX_INDEX_TASKS
-    } else {
-        0
-    };
-
-    let end = (start+MAX_INDEX_TASKS).min(finish_list.len());
-
-    let index_raw = if end > start {
-        render_task_list(&finish_list[start..end])
-    } else {
-        "".to_string()
-    };
-
-    replace_file(
-        PROXMOX_BACKUP_INDEX_TASK_FN,
-        index_raw.as_bytes(),
-        CreateOptions::new()
-            .owner(backup_user.uid)
-            .group(backup_user.gid),
-    )?;
-
-    if !finish_list.is_empty() && start > 0 {
+    if !finish_list.is_empty() {
         match std::fs::OpenOptions::new().append(true).create(true).open(PROXMOX_BACKUP_ARCHIVE_TASK_FN) {
             Ok(mut writer) => {
-                for info in &finish_list[0..start] {
+                for info in &finish_list {
                     writer.write_all(render_task_line(&info).as_bytes())?;
                 }
             },
@@ -448,6 +410,12 @@ fn update_active_workers(new_upid: Option<&UPID>) -> Result<(), Error> {
         nix::unistd::chown(PROXMOX_BACKUP_ARCHIVE_TASK_FN, Some(backup_user.uid), Some(backup_user.gid))?;
     }
 
+    // TODO Remove with 1.x
+    // for compatibility, if we had an INDEX file, we do not need it anymore
+    if had_index_file {
+        let _ = nix::unistd::unlink(PROXMOX_BACKUP_INDEX_TASK_FN);
+    }
+
     drop(lock);
 
     Ok(())
@@ -511,16 +479,9 @@ where
     read_task_file(file)
 }
 
-enum TaskFile {
-    Active,
-    Index,
-    Archive,
-    End,
-}
-
 pub struct TaskListInfoIterator {
     list: VecDeque<TaskListInfo>,
-    file: TaskFile,
+    end: bool,
     archive: Option<LogRotateFiles>,
     lock: Option<File>,
 }
@@ -535,7 +496,10 @@ impl TaskListInfoIterator {
                 .iter()
                 .any(|info| info.state.is_some() || !worker_is_active_local(&info.upid));
 
-            if needs_update {
+            // TODO remove with 1.x
+            let index_exists = std::path::Path::new(PROXMOX_BACKUP_INDEX_TASK_FN).is_file();
+
+            if needs_update || index_exists {
                 drop(lock);
                 update_active_workers(None)?;
                 let lock = lock_task_list_files(false)?;
@@ -554,12 +518,11 @@ impl TaskListInfoIterator {
             Some(logrotate.files())
         };
 
-        let file = if active_only { TaskFile::End } else { TaskFile::Active };
         let lock = if active_only { None } else { Some(read_lock) };
 
         Ok(Self {
             list: active_list.into(),
-            file,
+            end: active_only,
             archive,
             lock,
         })
@@ -573,35 +536,23 @@ impl Iterator for TaskListInfoIterator {
         loop {
             if let Some(element) = self.list.pop_back() {
                 return Some(Ok(element));
+            } else if self.end {
+                    return None;
             } else {
-                match self.file {
-                    TaskFile::Active => {
-                        let index = match read_task_file_from_path(PROXMOX_BACKUP_INDEX_TASK_FN) {
-                            Ok(index) => index,
+                if let Some(mut archive) = self.archive.take() {
+                    if let Some(file) = archive.next() {
+                        let list = match read_task_file(file) {
+                            Ok(list) => list,
                             Err(err) => return Some(Err(err)),
                         };
-                        self.list.append(&mut index.into());
-                        self.file = TaskFile::Index;
-                    },
-                    TaskFile::Index | TaskFile::Archive => {
-                        if let Some(mut archive) = self.archive.take() {
-                            if let Some(file) = archive.next() {
-                                let list = match read_task_file(file) {
-                                    Ok(list) => list,
-                                    Err(err) => return Some(Err(err)),
-                                };
-                                self.list.append(&mut list.into());
-                                self.archive = Some(archive);
-                                self.file = TaskFile::Archive;
-                                continue;
-                            }
-                        }
-                        self.file = TaskFile::End;
-                        self.lock.take();
-                        return None;
+                        self.list.append(&mut list.into());
+                        self.archive = Some(archive);
+                        continue;
                     }
-                    TaskFile::End => return None,
                 }
+
+                self.end = true;
+                self.lock.take();
             }
         }
     }
@@ -635,24 +586,15 @@ struct WorkerTaskData {
     pub abort_listeners: Vec<oneshot::Sender<()>>,
 }
 
-impl Drop for WorkerTask {
-
-    fn drop(&mut self) {
-        println!("unregister worker");
-    }
-}
-
 impl WorkerTask {
 
-    pub fn new(worker_type: &str, worker_id: Option<String>, userid: Userid, to_stdout: bool) -> Result<Arc<Self>, Error> {
-        println!("register worker");
-
-        let upid = UPID::new(worker_type, worker_id, userid)?;
+    pub fn new(worker_type: &str, worker_id: Option<String>, auth_id: Authid, to_stdout: bool) -> Result<Arc<Self>, Error> {
+        let upid = UPID::new(worker_type, worker_id, auth_id)?;
         let task_id = upid.task_id;
 
         let mut path = std::path::PathBuf::from(PROXMOX_BACKUP_TASK_DIR);
 
-        path.push(format!("{:02X}", upid.pstart % 256));
+        path.push(format!("{:02X}", upid.pstart & 255));
 
         let backup_user = crate::backup::backup_user()?;
 
@@ -660,10 +602,8 @@ impl WorkerTask {
 
         path.push(upid.to_string());
 
-        println!("FILE: {:?}", path);
-
         let logger_options = FileLogOptions {
-            to_stdout: to_stdout,
+            to_stdout,
             exclusive: true,
             prefix_time: true,
             read: true,
@@ -699,14 +639,14 @@ impl WorkerTask {
     pub fn spawn<F, T>(
         worker_type: &str,
         worker_id: Option<String>,
-        userid: Userid,
+        auth_id: Authid,
         to_stdout: bool,
         f: F,
     ) -> Result<String, Error>
         where F: Send + 'static + FnOnce(Arc<WorkerTask>) -> T,
               T: Send + 'static + Future<Output = Result<(), Error>>,
     {
-        let worker = WorkerTask::new(worker_type, worker_id, userid, to_stdout)?;
+        let worker = WorkerTask::new(worker_type, worker_id, auth_id, to_stdout)?;
         let upid_str = worker.upid.to_string();
         let f = f(worker.clone());
         tokio::spawn(async move {
@@ -721,15 +661,13 @@ impl WorkerTask {
     pub fn new_thread<F>(
         worker_type: &str,
         worker_id: Option<String>,
-        userid: Userid,
+        auth_id: Authid,
         to_stdout: bool,
         f: F,
     ) -> Result<String, Error>
         where F: Send + UnwindSafe + 'static + FnOnce(Arc<WorkerTask>) -> Result<(), Error>
     {
-        println!("register worker thread");
-
-        let worker = WorkerTask::new(worker_type, worker_id, userid, to_stdout)?;
+        let worker = WorkerTask::new(worker_type, worker_id, auth_id, to_stdout)?;
         let upid_str = worker.upid.to_string();
 
         let _child = std::thread::Builder::new().name(upid_str.clone()).spawn(move || {