-use failure::*;
-use lazy_static::lazy_static;
-use chrono::Local;
-
-use tokio::sync::oneshot;
-use futures::*;
-use std::sync::{Arc, Mutex};
use std::collections::HashMap;
-use std::sync::atomic::{AtomicBool, Ordering};
-use std::io::{BufRead, BufReader};
use std::fs::File;
+use std::io::{BufRead, BufReader};
use std::panic::UnwindSafe;
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::sync::{Arc, Mutex};
+use chrono::Local;
+use anyhow::{bail, format_err, Error};
+use futures::*;
+use lazy_static::lazy_static;
+use nix::unistd::Pid;
use serde_json::{json, Value};
+use tokio::sync::oneshot;
+
+use proxmox::sys::linux::procfs;
+use proxmox::try_block;
+use proxmox::tools::fs::{create_path, replace_file, CreateOptions};
use super::UPID;
-use crate::tools::{self, FileLogger};
+use crate::tools::FileLogger;
-macro_rules! PROXMOX_BACKUP_VAR_RUN_DIR_M { () => ("/var/run/proxmox-backup") }
+macro_rules! PROXMOX_BACKUP_VAR_RUN_DIR_M { () => ("/run/proxmox-backup") }
macro_rules! PROXMOX_BACKUP_LOG_DIR_M { () => ("/var/log/proxmox-backup") }
macro_rules! PROXMOX_BACKUP_TASK_DIR_M { () => (concat!( PROXMOX_BACKUP_LOG_DIR_M!(), "/tasks")) }
static ref WORKER_TASK_LIST: Mutex<HashMap<usize, Arc<WorkerTask>>> = Mutex::new(HashMap::new());
static ref MY_PID: i32 = unsafe { libc::getpid() };
- static ref MY_PID_PSTART: u64 = tools::procfs::read_proc_pid_stat(*MY_PID).unwrap().starttime;
+ static ref MY_PID_PSTART: u64 = procfs::PidStat::read_from_pid(Pid::from_raw(*MY_PID))
+ .unwrap()
+ .starttime;
}
/// Test if the task is still running
pub fn worker_is_active(upid: &UPID) -> bool {
if (upid.pid == *MY_PID) && (upid.pstart == *MY_PID_PSTART) {
- if WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id) {
- true
- } else {
- false
- }
+ WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id)
} else {
- match tools::procfs::check_process_running_pstart(upid.pid, upid.pstart) {
- Some(_) => true,
- _ => false,
- }
+ procfs::check_process_running_pstart(upid.pid, upid.pstart).is_some()
}
}
let control_future = super::create_control_socket(socketname, |param| {
let param = param.as_object()
- .ok_or(format_err!("unable to parse parameters (expected json object)"))?;
+ .ok_or_else(|| format_err!("unable to parse parameters (expected json object)"))?;
if param.keys().count() != 2 { bail!("wrong number of parameters"); }
let command = param.get("command")
- .ok_or(format_err!("unable to parse parameters (missing command)"))?;
+ .ok_or_else(|| format_err!("unable to parse parameters (missing command)"))?;
// this is the only command for now
if command != "abort-task" { bail!("got unknown command '{}'", command); }
let upid_str = param["upid"].as_str()
- .ok_or(format_err!("unable to parse parameters (missing upid)"))?;
+ .ok_or_else(|| format_err!("unable to parse parameters (missing upid)"))?;
let upid = upid_str.parse::<UPID>()?;
}
pub fn abort_worker_async(upid: UPID) {
- let task = abort_worker(upid);
-
- tokio::spawn(task.then(|res| {
- if let Err(err) = res {
+ tokio::spawn(async move {
+ if let Err(err) = abort_worker(upid).await {
eprintln!("abort worker failed - {}", err);
}
- Ok(())
- }));
+ });
}
-pub fn abort_worker(upid: UPID) -> impl Future<Item=(), Error=Error> {
+pub fn abort_worker(upid: UPID) -> impl Future<Output = Result<(), Error>> {
let target_pid = upid.pid;
"upid": upid.to_string(),
});
- super::send_command(socketname, cmd).map(|_| {})
+ super::send_command(socketname, cmd).map_ok(|_| ())
}
fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<(i64, String)>), Error> {
pub fn create_task_log_dirs() -> Result<(), Error> {
try_block!({
- let (backup_uid, backup_gid) = tools::getpwnam_ugid("backup")?;
- let uid = Some(nix::unistd::Uid::from_raw(backup_uid));
- let gid = Some(nix::unistd::Gid::from_raw(backup_gid));
-
- tools::create_dir_chown(PROXMOX_BACKUP_LOG_DIR, None, uid, gid)?;
- tools::create_dir_chown(PROXMOX_BACKUP_TASK_DIR, None, uid, gid)?;
- tools::create_dir_chown(PROXMOX_BACKUP_VAR_RUN_DIR, None, uid, gid)?;
+ let backup_user = crate::backup::backup_user()?;
+ let opts = CreateOptions::new()
+ .owner(backup_user.uid)
+ .group(backup_user.gid);
+
+ create_path(PROXMOX_BACKUP_LOG_DIR, None, Some(opts.clone()))?;
+ create_path(PROXMOX_BACKUP_TASK_DIR, None, Some(opts.clone()))?;
+ create_path(PROXMOX_BACKUP_VAR_RUN_DIR, None, Some(opts))?;
Ok(())
}).map_err(|err: Error| format_err!("unable to create task log dir - {}", err))?;
// Returns a sorted list of known tasks,
fn update_active_workers(new_upid: Option<&UPID>) -> Result<Vec<TaskListInfo>, Error> {
- let (backup_uid, backup_gid) = tools::getpwnam_ugid("backup")?;
- let uid = Some(nix::unistd::Uid::from_raw(backup_uid));
- let gid = Some(nix::unistd::Gid::from_raw(backup_gid));
+ let backup_user = crate::backup::backup_user()?;
- let lock = tools::open_file_locked(PROXMOX_BACKUP_TASK_LOCK_FN, std::time::Duration::new(10, 0))?;
- nix::unistd::chown(PROXMOX_BACKUP_TASK_LOCK_FN, uid, gid)?;
+ let lock = crate::tools::open_file_locked(PROXMOX_BACKUP_TASK_LOCK_FN, std::time::Duration::new(10, 0))?;
+ nix::unistd::chown(PROXMOX_BACKUP_TASK_LOCK_FN, Some(backup_user.uid), Some(backup_user.gid))?;
let reader = match File::open(PROXMOX_BACKUP_ACTIVE_TASK_FN) {
Ok(f) => Some(BufReader::new(f)),
match state {
None => {
println!("Detected stoped UPID {}", upid_str);
- let status = upid_read_status(&upid).unwrap_or(String::from("unknown"));
+ let status = upid_read_status(&upid)
+ .unwrap_or_else(|_| String::from("unknown"));
finish_list.push(TaskListInfo {
upid, upid_str, state: Some((Local::now().timestamp(), status))
});
}
}
- tools::file_set_contents_full(PROXMOX_BACKUP_ACTIVE_TASK_FN, raw.as_bytes(), None, uid, gid)?;
+ replace_file(
+ PROXMOX_BACKUP_ACTIVE_TASK_FN,
+ raw.as_bytes(),
+ CreateOptions::new()
+ .owner(backup_user.uid)
+ .group(backup_user.gid),
+ )?;
drop(lock);
struct WorkerTaskData {
logger: FileLogger,
progress: f64, // 0..1
+ pub abort_listeners: Vec<oneshot::Sender<()>>,
}
impl Drop for WorkerTask {
path.push(format!("{:02X}", upid.pstart % 256));
- let (backup_uid, backup_gid) = tools::getpwnam_ugid("backup")?;
- let uid = Some(nix::unistd::Uid::from_raw(backup_uid));
- let gid = Some(nix::unistd::Gid::from_raw(backup_gid));
+ let backup_user = crate::backup::backup_user()?;
- tools::create_dir_chown(&path, None, uid, gid)?;
+ create_path(&path, None, Some(CreateOptions::new().owner(backup_user.uid).group(backup_user.gid)))?;
path.push(upid.to_string());
println!("FILE: {:?}", path);
let logger = FileLogger::new(&path, to_stdout)?;
- nix::unistd::chown(&path, uid, gid)?;
+ nix::unistd::chown(&path, Some(backup_user.uid), Some(backup_user.gid))?;
update_active_workers(Some(&upid))?;
let worker = Arc::new(Self {
- upid: upid,
+ upid,
abort_requested: AtomicBool::new(false),
data: Mutex::new(WorkerTaskData {
logger,
progress: 0.0,
+ abort_listeners: vec![],
}),
});
f: F,
) -> Result<String, Error>
where F: Send + 'static + FnOnce(Arc<WorkerTask>) -> T,
- T: Send + 'static + Future<Item=(), Error=Error>,
+ T: Send + 'static + Future<Output = Result<(), Error>>,
{
let worker = WorkerTask::new(worker_type, worker_id, username, to_stdout)?;
let upid_str = worker.upid.to_string();
-
- tokio::spawn(f(worker.clone()).then(move |result| {
- worker.log_result(result);
- Ok(())
- }));
+ let f = f(worker.clone());
+ tokio::spawn(async move {
+ let result = f.await;
+ worker.log_result(&result);
+ });
Ok(upid_str)
}
let worker = WorkerTask::new(worker_type, worker_id, username, to_stdout)?;
let upid_str = worker.upid.to_string();
- let _child = std::thread::spawn(move || {
+ let _child = std::thread::Builder::new().name(upid_str.clone()).spawn(move || {
let worker1 = worker.clone();
let result = match std::panic::catch_unwind(move || f(worker1)) {
Ok(r) => r,
}
};
- worker.log_result(result);
+ worker.log_result(&result);
p.send(()).unwrap();
});
- tokio::spawn(c.then(|_| Ok(())));
+ tokio::spawn(c.map(|_| ()));
Ok(upid_str)
}
/// Log task result, remove task from running list
- pub fn log_result(&self, result: Result<(), Error>) {
+ pub fn log_result(&self, result: &Result<(), Error>) {
if let Err(err) = result {
self.log(&format!("TASK ERROR: {}", err));
pub fn request_abort(&self) {
eprintln!("set abort flag for worker {}", self.upid);
self.abort_requested.store(true, Ordering::SeqCst);
+ // noitify listeners
+ let mut data = self.data.lock().unwrap();
+ loop {
+ match data.abort_listeners.pop() {
+ None => { break; },
+ Some(ch) => {
+ let _ = ch.send(()); // ignore erros here
+ },
+ }
+ }
}
/// Test if abort was requested.
/// Fail if abort was requested.
pub fn fail_on_abort(&self) -> Result<(), Error> {
if self.abort_requested() {
- bail!("task '{}': abort requested - aborting task", self.upid);
+ bail!("abort requested - aborting task");
}
Ok(())
}
+
+ /// Get a future which resolves on task abort
+ pub fn abort_future(&self) -> oneshot::Receiver<()> {
+ let (tx, rx) = oneshot::channel::<()>();
+
+ let mut data = self.data.lock().unwrap();
+ if self.abort_requested() {
+ let _ = tx.send(());
+ } else {
+ data.abort_listeners.push(tx);
+ }
+ rx
+ }
}