]> git.proxmox.com Git - proxmox-backup.git/blobdiff - src/server/worker_task.rs
garbage_collect: call fail_on_abort to abort GV when requested.
[proxmox-backup.git] / src / server / worker_task.rs
index 6140a463b21ac1d6d906a1e7c55ed3db6d1a67f2..3d2b4a047cdd31188cfb74c59c8052fa7eff7bc9 100644 (file)
@@ -1,21 +1,27 @@
-use failure::*;
-use lazy_static::lazy_static;
-use chrono::Local;
-
-use tokio::sync::oneshot;
-use futures::*;
-use std::sync::{Arc, Mutex};
 use std::collections::HashMap;
-use std::sync::atomic::{AtomicBool, Ordering};
-use std::io::{BufRead, BufReader};
 use std::fs::File;
+use std::io::{BufRead, BufReader};
+use std::panic::UnwindSafe;
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::sync::{Arc, Mutex};
+
+use chrono::Local;
+use anyhow::{bail, format_err, Error};
+use futures::*;
+use lazy_static::lazy_static;
+use nix::unistd::Pid;
 use serde_json::{json, Value};
+use tokio::sync::oneshot;
+
+use proxmox::sys::linux::procfs;
+use proxmox::try_block;
+use proxmox::tools::fs::{create_path, replace_file, CreateOptions};
 
 use super::UPID;
 
-use crate::tools::{self, FileLogger};
+use crate::tools::FileLogger;
 
-macro_rules! PROXMOX_BACKUP_VAR_RUN_DIR_M { () => ("/var/run/proxmox-backup") }
+macro_rules! PROXMOX_BACKUP_VAR_RUN_DIR_M { () => ("/run/proxmox-backup") }
 macro_rules! PROXMOX_BACKUP_LOG_DIR_M { () => ("/var/log/proxmox-backup") }
 macro_rules! PROXMOX_BACKUP_TASK_DIR_M { () => (concat!( PROXMOX_BACKUP_LOG_DIR_M!(), "/tasks")) }
 
@@ -29,23 +35,18 @@ lazy_static! {
     static ref WORKER_TASK_LIST: Mutex<HashMap<usize, Arc<WorkerTask>>> = Mutex::new(HashMap::new());
 
     static ref MY_PID: i32 = unsafe { libc::getpid() };
-    static ref MY_PID_PSTART: u64 = tools::procfs::read_proc_pid_stat(*MY_PID).unwrap().starttime;
+    static ref MY_PID_PSTART: u64 = procfs::PidStat::read_from_pid(Pid::from_raw(*MY_PID))
+        .unwrap()
+        .starttime;
 }
 
 /// Test if the task is still running
 pub fn worker_is_active(upid: &UPID) -> bool {
 
     if (upid.pid == *MY_PID) && (upid.pstart == *MY_PID_PSTART) {
-        if WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id) {
-            true
-        } else {
-            false
-        }
+        WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id)
     } else {
-        match tools::procfs::check_process_running_pstart(upid.pid, upid.pstart) {
-            Some(_) => true,
-            _ => false,
-        }
+        procfs::check_process_running_pstart(upid.pid, upid.pstart).is_some()
     }
 }
 
@@ -56,17 +57,17 @@ pub fn create_task_control_socket() -> Result<(), Error> {
 
     let control_future = super::create_control_socket(socketname, |param| {
         let param = param.as_object()
-            .ok_or(format_err!("unable to parse parameters (expected json object)"))?;
+            .ok_or_else(|| format_err!("unable to parse parameters (expected json object)"))?;
         if param.keys().count() != 2 { bail!("wrong number of parameters"); }
 
         let command = param.get("command")
-            .ok_or(format_err!("unable to parse parameters (missing command)"))?;
+            .ok_or_else(|| format_err!("unable to parse parameters (missing command)"))?;
 
         // this is the only command for now
         if command != "abort-task" { bail!("got unknown command '{}'", command); }
 
         let upid_str = param["upid"].as_str()
-            .ok_or(format_err!("unable to parse parameters (missing upid)"))?;
+            .ok_or_else(|| format_err!("unable to parse parameters (missing upid)"))?;
 
         let upid = upid_str.parse::<UPID>()?;
 
@@ -89,17 +90,14 @@ pub fn create_task_control_socket() -> Result<(), Error> {
 }
 
 pub fn abort_worker_async(upid: UPID) {
-    let task = abort_worker(upid);
-
-    tokio::spawn(task.then(|res| {
-        if let Err(err) = res {
+    tokio::spawn(async move {
+        if let Err(err) = abort_worker(upid).await {
             eprintln!("abort worker failed - {}", err);
         }
-        Ok(())
-    }));
+    });
 }
 
-pub fn abort_worker(upid: UPID) -> impl Future<Item=(), Error=Error> {
+pub fn abort_worker(upid: UPID) -> impl Future<Output = Result<(), Error>> {
 
     let target_pid = upid.pid;
 
@@ -111,7 +109,7 @@ pub fn abort_worker(upid: UPID) -> impl Future<Item=(), Error=Error> {
         "upid": upid.to_string(),
     });
 
-    super::send_command(socketname, cmd).map(|_| {})
+    super::send_command(socketname, cmd).map_ok(|_| ())
 }
 
 fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<(i64, String)>), Error> {
@@ -134,13 +132,14 @@ fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<(i64, St
 pub fn create_task_log_dirs() -> Result<(), Error> {
 
     try_block!({
-        let (backup_uid, backup_gid) = tools::getpwnam_ugid("backup")?;
-        let uid = Some(nix::unistd::Uid::from_raw(backup_uid));
-        let gid = Some(nix::unistd::Gid::from_raw(backup_gid));
-
-        tools::create_dir_chown(PROXMOX_BACKUP_LOG_DIR, None, uid, gid)?;
-        tools::create_dir_chown(PROXMOX_BACKUP_TASK_DIR, None, uid, gid)?;
-        tools::create_dir_chown(PROXMOX_BACKUP_VAR_RUN_DIR, None, uid, gid)?;
+        let backup_user = crate::backup::backup_user()?;
+        let opts = CreateOptions::new()
+            .owner(backup_user.uid)
+            .group(backup_user.gid);
+
+        create_path(PROXMOX_BACKUP_LOG_DIR, None, Some(opts.clone()))?;
+        create_path(PROXMOX_BACKUP_TASK_DIR, None, Some(opts.clone()))?;
+        create_path(PROXMOX_BACKUP_VAR_RUN_DIR, None, Some(opts))?;
         Ok(())
     }).map_err(|err: Error| format_err!("unable to create task log dir - {}", err))?;
 
@@ -173,7 +172,7 @@ pub fn upid_read_status(upid: &UPID) -> Result<String, Error> {
                 if rest == "OK" {
                     status = String::from(rest);
                 } else if rest.starts_with("ERROR: ") {
-                    status = String::from(rest);
+                    status = String::from(&rest[7..]);
                 }
             }
         }
@@ -202,12 +201,10 @@ pub struct TaskListInfo {
 // Returns a sorted list of known tasks,
 fn update_active_workers(new_upid: Option<&UPID>) -> Result<Vec<TaskListInfo>, Error> {
 
-    let (backup_uid, backup_gid) = tools::getpwnam_ugid("backup")?;
-    let uid = Some(nix::unistd::Uid::from_raw(backup_uid));
-    let gid = Some(nix::unistd::Gid::from_raw(backup_gid));
+    let backup_user = crate::backup::backup_user()?;
 
-    let lock = tools::open_file_locked(PROXMOX_BACKUP_TASK_LOCK_FN, std::time::Duration::new(10, 0))?;
-    nix::unistd::chown(PROXMOX_BACKUP_TASK_LOCK_FN, uid, gid)?;
+    let lock = crate::tools::open_file_locked(PROXMOX_BACKUP_TASK_LOCK_FN, std::time::Duration::new(10, 0))?;
+    nix::unistd::chown(PROXMOX_BACKUP_TASK_LOCK_FN, Some(backup_user.uid), Some(backup_user.gid))?;
 
     let reader = match File::open(PROXMOX_BACKUP_ACTIVE_TASK_FN) {
         Ok(f) => Some(BufReader::new(f)),
@@ -239,7 +236,8 @@ fn update_active_workers(new_upid: Option<&UPID>) -> Result<Vec<TaskListInfo>, E
                         match state {
                             None => {
                                 println!("Detected stoped UPID {}", upid_str);
-                                let status = upid_read_status(&upid).unwrap_or(String::from("unknown"));
+                                let status = upid_read_status(&upid)
+                                    .unwrap_or_else(|_| String::from("unknown"));
                                 finish_list.push(TaskListInfo {
                                     upid, upid_str, state: Some((Local::now().timestamp(), status))
                                 });
@@ -282,7 +280,7 @@ fn update_active_workers(new_upid: Option<&UPID>) -> Result<Vec<TaskListInfo>, E
     let mut task_list: Vec<TaskListInfo> = vec![];
     for (_, info) in task_hash { task_list.push(info); }
 
-    task_list.sort_unstable_by(|a, b| {
+    task_list.sort_unstable_by(|b, a| { // lastest on top
         match (&a.state, &b.state) {
             (Some(s1), Some(s2)) => s1.0.cmp(&s2.0),
             (Some(_), None) => std::cmp::Ordering::Less,
@@ -301,7 +299,13 @@ fn update_active_workers(new_upid: Option<&UPID>) -> Result<Vec<TaskListInfo>, E
         }
     }
 
-    tools::file_set_contents_full(PROXMOX_BACKUP_ACTIVE_TASK_FN, raw.as_bytes(), None, uid, gid)?;
+    replace_file(
+        PROXMOX_BACKUP_ACTIVE_TASK_FN,
+        raw.as_bytes(),
+        CreateOptions::new()
+            .owner(backup_user.uid)
+            .group(backup_user.gid),
+    )?;
 
     drop(lock);
 
@@ -339,6 +343,7 @@ impl std::fmt::Display for WorkerTask {
 struct WorkerTaskData {
     logger: FileLogger,
     progress: f64, // 0..1
+    pub abort_listeners: Vec<oneshot::Sender<()>>,
 }
 
 impl Drop for WorkerTask {
@@ -350,7 +355,7 @@ impl Drop for WorkerTask {
 
 impl WorkerTask {
 
-    fn new(worker_type: &str, worker_id: Option<String>, username: &str, to_stdout: bool) -> Result<Arc<Self>, Error> {
+    pub fn new(worker_type: &str, worker_id: Option<String>, username: &str, to_stdout: bool) -> Result<Arc<Self>, Error> {
         println!("register worker");
 
         let upid = UPID::new(worker_type, worker_id, username)?;
@@ -360,27 +365,26 @@ impl WorkerTask {
 
         path.push(format!("{:02X}", upid.pstart % 256));
 
-        let (backup_uid, backup_gid) = tools::getpwnam_ugid("backup")?;
-        let uid = Some(nix::unistd::Uid::from_raw(backup_uid));
-        let gid = Some(nix::unistd::Gid::from_raw(backup_gid));
+        let backup_user = crate::backup::backup_user()?;
 
-        tools::create_dir_chown(&path, None, uid, gid)?;
+        create_path(&path, None, Some(CreateOptions::new().owner(backup_user.uid).group(backup_user.gid)))?;
 
         path.push(upid.to_string());
 
         println!("FILE: {:?}", path);
 
         let logger = FileLogger::new(&path, to_stdout)?;
-        nix::unistd::chown(&path, uid, gid)?;
+        nix::unistd::chown(&path, Some(backup_user.uid), Some(backup_user.gid))?;
 
         update_active_workers(Some(&upid))?;
 
         let worker = Arc::new(Self {
-            upid: upid,
+            upid,
             abort_requested: AtomicBool::new(false),
             data: Mutex::new(WorkerTaskData {
                 logger,
                 progress: 0.0,
+                abort_listeners: vec![],
             }),
         });
 
@@ -401,19 +405,15 @@ impl WorkerTask {
         f: F,
     ) -> Result<String, Error>
         where F: Send + 'static + FnOnce(Arc<WorkerTask>) -> T,
-              T: Send + 'static + Future<Item=(), Error=Error>,
+              T: Send + 'static + Future<Output = Result<(), Error>>,
     {
         let worker = WorkerTask::new(worker_type, worker_id, username, to_stdout)?;
-        let task_id = worker.upid.task_id;
         let upid_str = worker.upid.to_string();
-
-        tokio::spawn(f(worker.clone()).then(move |result| {
-            WORKER_TASK_LIST.lock().unwrap().remove(&task_id);
-            worker.log_result(result);
-            let _ = update_active_workers(None);
-            super::set_worker_count(WORKER_TASK_LIST.lock().unwrap().len());
-            Ok(())
-        }));
+        let f = f(worker.clone());
+        tokio::spawn(async move {
+            let result = f.await;
+            worker.log_result(&result);
+        });
 
         Ok(upid_str)
     }
@@ -426,36 +426,52 @@ impl WorkerTask {
         to_stdout: bool,
         f: F,
     ) -> Result<String, Error>
-        where F: Send + 'static + FnOnce(Arc<WorkerTask>) -> Result<(), Error>
+        where F: Send + UnwindSafe + 'static + FnOnce(Arc<WorkerTask>) -> Result<(), Error>
     {
         println!("register worker thread");
 
         let (p, c) = oneshot::channel::<()>();
 
         let worker = WorkerTask::new(worker_type, worker_id, username, to_stdout)?;
-        let task_id = worker.upid.task_id;
         let upid_str = worker.upid.to_string();
 
-        let _child = std::thread::spawn(move || {
-            let result = f(worker.clone());
-            WORKER_TASK_LIST.lock().unwrap().remove(&task_id);
-            worker.log_result(result);
-            let _ = update_active_workers(None);
+        let _child = std::thread::Builder::new().name(upid_str.clone()).spawn(move || {
+            let worker1 = worker.clone();
+            let result = match std::panic::catch_unwind(move || f(worker1)) {
+                Ok(r) => r,
+                Err(panic) => {
+                    match panic.downcast::<&str>() {
+                        Ok(panic_msg) => {
+                            Err(format_err!("worker panicked: {}", panic_msg))
+                        }
+                        Err(_) => {
+                            Err(format_err!("worker panicked: unknown type."))
+                        }
+                    }
+                }
+            };
+
+            worker.log_result(&result);
             p.send(()).unwrap();
-            super::set_worker_count(WORKER_TASK_LIST.lock().unwrap().len());
         });
 
-        tokio::spawn(c.then(|_| Ok(())));
+        tokio::spawn(c.map(|_| ()));
 
         Ok(upid_str)
     }
 
-    fn log_result(&self, result: Result<(), Error>) {
+    /// Log task result, remove task from running list
+    pub fn log_result(&self, result: &Result<(), Error>) {
+
         if let Err(err) = result {
             self.log(&format!("TASK ERROR: {}", err));
         } else {
             self.log("TASK OK");
         }
+
+        WORKER_TASK_LIST.lock().unwrap().remove(&self.upid.task_id);
+        let _ = update_active_workers(None);
+        super::set_worker_count(WORKER_TASK_LIST.lock().unwrap().len());
     }
 
     /// Log a message.
@@ -476,7 +492,18 @@ impl WorkerTask {
 
     /// Request abort
     pub fn request_abort(&self) {
+        eprintln!("set abort flag for worker {}", self.upid);
         self.abort_requested.store(true, Ordering::SeqCst);
+        // noitify listeners
+        let mut data = self.data.lock().unwrap();
+        loop {
+            match data.abort_listeners.pop() {
+                None => { break; },
+                Some(ch) => {
+                    let _ = ch.send(()); // ignore erros here
+                },
+            }
+        }
     }
 
     /// Test if abort was requested.
@@ -487,8 +514,21 @@ impl WorkerTask {
     /// Fail if abort was requested.
     pub fn fail_on_abort(&self) -> Result<(), Error> {
         if self.abort_requested() {
-            bail!("task '{}': abort requested - aborting task", self.upid);
+            bail!("abort requested - aborting task");
         }
         Ok(())
     }
+
+    /// Get a future which resolves on task abort
+    pub fn abort_future(&self) ->  oneshot::Receiver<()> {
+        let (tx, rx) = oneshot::channel::<()>();
+
+        let mut data = self.data.lock().unwrap();
+        if self.abort_requested() {
+            let _ = tx.send(());
+        } else {
+            data.abort_listeners.push(tx);
+        }
+        rx
+    }
 }