]> git.proxmox.com Git - proxmox-backup.git/blobdiff - src/server/worker_task.rs
garbage_collect: call fail_on_abort to abort GV when requested.
[proxmox-backup.git] / src / server / worker_task.rs
index 6a55755a9007fa54dfc6ff52372f4156de86ca63..3d2b4a047cdd31188cfb74c59c8052fa7eff7bc9 100644 (file)
@@ -1,22 +1,21 @@
-use failure::*;
-use lazy_static::lazy_static;
-use chrono::Local;
-
-use tokio::sync::oneshot;
-use futures::*;
-use std::sync::{Arc, Mutex};
 use std::collections::HashMap;
-use std::sync::atomic::{AtomicBool, Ordering};
-use std::io::{BufRead, BufReader};
 use std::fs::File;
+use std::io::{BufRead, BufReader};
 use std::panic::UnwindSafe;
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::sync::{Arc, Mutex};
 
+use chrono::Local;
+use anyhow::{bail, format_err, Error};
+use futures::*;
+use lazy_static::lazy_static;
+use nix::unistd::Pid;
 use serde_json::{json, Value};
+use tokio::sync::oneshot;
 
-use proxmox::tools::{
-    try_block,
-    fs::{create_path, file_set_contents_full, CreateOptions},
-};
+use proxmox::sys::linux::procfs;
+use proxmox::try_block;
+use proxmox::tools::fs::{create_path, replace_file, CreateOptions};
 
 use super::UPID;
 
@@ -36,23 +35,18 @@ lazy_static! {
     static ref WORKER_TASK_LIST: Mutex<HashMap<usize, Arc<WorkerTask>>> = Mutex::new(HashMap::new());
 
     static ref MY_PID: i32 = unsafe { libc::getpid() };
-    static ref MY_PID_PSTART: u64 = proxmox::sys::linux::procfs::read_proc_pid_stat(*MY_PID).unwrap().starttime;
+    static ref MY_PID_PSTART: u64 = procfs::PidStat::read_from_pid(Pid::from_raw(*MY_PID))
+        .unwrap()
+        .starttime;
 }
 
 /// Test if the task is still running
 pub fn worker_is_active(upid: &UPID) -> bool {
 
     if (upid.pid == *MY_PID) && (upid.pstart == *MY_PID_PSTART) {
-        if WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id) {
-            true
-        } else {
-            false
-        }
+        WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id)
     } else {
-        match proxmox::sys::linux::procfs::check_process_running_pstart(upid.pid, upid.pstart) {
-            Some(_) => true,
-            _ => false,
-        }
+        procfs::check_process_running_pstart(upid.pid, upid.pstart).is_some()
     }
 }
 
@@ -63,17 +57,17 @@ pub fn create_task_control_socket() -> Result<(), Error> {
 
     let control_future = super::create_control_socket(socketname, |param| {
         let param = param.as_object()
-            .ok_or(format_err!("unable to parse parameters (expected json object)"))?;
+            .ok_or_else(|| format_err!("unable to parse parameters (expected json object)"))?;
         if param.keys().count() != 2 { bail!("wrong number of parameters"); }
 
         let command = param.get("command")
-            .ok_or(format_err!("unable to parse parameters (missing command)"))?;
+            .ok_or_else(|| format_err!("unable to parse parameters (missing command)"))?;
 
         // this is the only command for now
         if command != "abort-task" { bail!("got unknown command '{}'", command); }
 
         let upid_str = param["upid"].as_str()
-            .ok_or(format_err!("unable to parse parameters (missing upid)"))?;
+            .ok_or_else(|| format_err!("unable to parse parameters (missing upid)"))?;
 
         let upid = upid_str.parse::<UPID>()?;
 
@@ -138,10 +132,10 @@ fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<(i64, St
 pub fn create_task_log_dirs() -> Result<(), Error> {
 
     try_block!({
-        let (backup_uid, backup_gid) = crate::tools::getpwnam_ugid("backup")?;
+        let backup_user = crate::backup::backup_user()?;
         let opts = CreateOptions::new()
-            .owner(nix::unistd::Uid::from_raw(backup_uid))
-            .group(nix::unistd::Gid::from_raw(backup_gid));
+            .owner(backup_user.uid)
+            .group(backup_user.gid);
 
         create_path(PROXMOX_BACKUP_LOG_DIR, None, Some(opts.clone()))?;
         create_path(PROXMOX_BACKUP_TASK_DIR, None, Some(opts.clone()))?;
@@ -207,12 +201,10 @@ pub struct TaskListInfo {
 // Returns a sorted list of known tasks,
 fn update_active_workers(new_upid: Option<&UPID>) -> Result<Vec<TaskListInfo>, Error> {
 
-    let (backup_uid, backup_gid) = crate::tools::getpwnam_ugid("backup")?;
-    let uid = Some(nix::unistd::Uid::from_raw(backup_uid));
-    let gid = Some(nix::unistd::Gid::from_raw(backup_gid));
+    let backup_user = crate::backup::backup_user()?;
 
     let lock = crate::tools::open_file_locked(PROXMOX_BACKUP_TASK_LOCK_FN, std::time::Duration::new(10, 0))?;
-    nix::unistd::chown(PROXMOX_BACKUP_TASK_LOCK_FN, uid, gid)?;
+    nix::unistd::chown(PROXMOX_BACKUP_TASK_LOCK_FN, Some(backup_user.uid), Some(backup_user.gid))?;
 
     let reader = match File::open(PROXMOX_BACKUP_ACTIVE_TASK_FN) {
         Ok(f) => Some(BufReader::new(f)),
@@ -244,7 +236,8 @@ fn update_active_workers(new_upid: Option<&UPID>) -> Result<Vec<TaskListInfo>, E
                         match state {
                             None => {
                                 println!("Detected stoped UPID {}", upid_str);
-                                let status = upid_read_status(&upid).unwrap_or(String::from("unknown"));
+                                let status = upid_read_status(&upid)
+                                    .unwrap_or_else(|_| String::from("unknown"));
                                 finish_list.push(TaskListInfo {
                                     upid, upid_str, state: Some((Local::now().timestamp(), status))
                                 });
@@ -306,7 +299,13 @@ fn update_active_workers(new_upid: Option<&UPID>) -> Result<Vec<TaskListInfo>, E
         }
     }
 
-    file_set_contents_full(PROXMOX_BACKUP_ACTIVE_TASK_FN, raw.as_bytes(), None, uid, gid)?;
+    replace_file(
+        PROXMOX_BACKUP_ACTIVE_TASK_FN,
+        raw.as_bytes(),
+        CreateOptions::new()
+            .owner(backup_user.uid)
+            .group(backup_user.gid),
+    )?;
 
     drop(lock);
 
@@ -366,23 +365,21 @@ impl WorkerTask {
 
         path.push(format!("{:02X}", upid.pstart % 256));
 
-        let (backup_uid, backup_gid) = crate::tools::getpwnam_ugid("backup")?;
-        let uid = nix::unistd::Uid::from_raw(backup_uid);
-        let gid = nix::unistd::Gid::from_raw(backup_gid);
+        let backup_user = crate::backup::backup_user()?;
 
-        create_path(&path, None, Some(CreateOptions::new().owner(uid).group(gid)))?;
+        create_path(&path, None, Some(CreateOptions::new().owner(backup_user.uid).group(backup_user.gid)))?;
 
         path.push(upid.to_string());
 
         println!("FILE: {:?}", path);
 
         let logger = FileLogger::new(&path, to_stdout)?;
-        nix::unistd::chown(&path, Some(uid), Some(gid))?;
+        nix::unistd::chown(&path, Some(backup_user.uid), Some(backup_user.gid))?;
 
         update_active_workers(Some(&upid))?;
 
         let worker = Arc::new(Self {
-            upid: upid,
+            upid,
             abort_requested: AtomicBool::new(false),
             data: Mutex::new(WorkerTaskData {
                 logger,
@@ -438,7 +435,7 @@ impl WorkerTask {
         let worker = WorkerTask::new(worker_type, worker_id, username, to_stdout)?;
         let upid_str = worker.upid.to_string();
 
-        let _child = std::thread::spawn(move || {
+        let _child = std::thread::Builder::new().name(upid_str.clone()).spawn(move || {
             let worker1 = worker.clone();
             let result = match std::panic::catch_unwind(move || f(worker1)) {
                 Ok(r) => r,
@@ -517,7 +514,7 @@ impl WorkerTask {
     /// Fail if abort was requested.
     pub fn fail_on_abort(&self) -> Result<(), Error> {
         if self.abort_requested() {
-            bail!("task '{}': abort requested - aborting task", self.upid);
+            bail!("abort requested - aborting task");
         }
         Ok(())
     }