From d607b8861b5a2e33543a4c41cf72d0a8c4145343 Mon Sep 17 00:00:00 2001 From: Dietmar Maurer Date: Tue, 9 Apr 2019 12:15:06 +0200 Subject: [PATCH] src/server/worker_task.rs: implement task control socket --- src/bin/proxmox-backup-api.rs | 13 +++++--- src/bin/proxmox-backup-proxy.rs | 9 +++++- src/server/worker_task.rs | 56 ++++++++++++++++++++++++++++----- 3 files changed, 65 insertions(+), 13 deletions(-) diff --git a/src/bin/proxmox-backup-api.rs b/src/bin/proxmox-backup-api.rs index 37d950ae..36d1eefe 100644 --- a/src/bin/proxmox-backup-api.rs +++ b/src/bin/proxmox-backup-api.rs @@ -1,6 +1,5 @@ -extern crate proxmox_backup; - //use proxmox_backup::tools; +use proxmox_backup::try_block; use proxmox_backup::api_schema::router::*; use proxmox_backup::api_schema::config::*; use proxmox_backup::server::rest::*; @@ -33,7 +32,7 @@ fn run() -> Result<(), Error> { bail!("unable to inititialize syslog - {}", err); } - server::create_task_log_dir()?; + server::create_task_log_dirs()?; config::create_configdir()?; @@ -69,7 +68,13 @@ fn run() -> Result<(), Error> { tokio::run(lazy(|| { - if let Err(err) = server::server_state_init() { + let init_result: Result<(), Error> = try_block!({ + server::create_task_control_socket()?; + server::server_state_init()?; + Ok(()) + }); + + if let Err(err) = init_result { eprintln!("unable to start daemon - {}", err); } else { tokio::spawn(server); diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs index 556b2ac3..9661cbe1 100644 --- a/src/bin/proxmox-backup-proxy.rs +++ b/src/bin/proxmox-backup-proxy.rs @@ -1,3 +1,4 @@ +use proxmox_backup::try_block; use proxmox_backup::configdir; use proxmox_backup::tools; use proxmox_backup::server; @@ -99,7 +100,13 @@ fn run() -> Result<(), Error> { tokio::run(lazy(|| { - if let Err(err) = server::server_state_init() { + let init_result: Result<(), Error> = try_block!({ + server::create_task_control_socket()?; + server::server_state_init()?; + Ok(()) + }); + + if let Err(err) = init_result { eprintln!("unable to start daemon - {}", err); } else { tokio::spawn(server); diff --git a/src/server/worker_task.rs b/src/server/worker_task.rs index 70cf95f3..833301ce 100644 --- a/src/server/worker_task.rs +++ b/src/server/worker_task.rs @@ -9,14 +9,17 @@ use std::collections::HashMap; use std::sync::atomic::{AtomicBool, Ordering}; use std::io::{BufRead, BufReader}; use std::fs::File; +use serde_json::Value; use super::UPID; use crate::tools::{self, FileLogger}; +macro_rules! PROXMOX_BACKUP_VAR_RUN_DIR_M { () => ("/var/run/proxmox-backup") } macro_rules! PROXMOX_BACKUP_LOG_DIR_M { () => ("/var/log/proxmox-backup") } macro_rules! PROXMOX_BACKUP_TASK_DIR_M { () => (concat!( PROXMOX_BACKUP_LOG_DIR_M!(), "/tasks")) } +pub const PROXMOX_BACKUP_VAR_RUN_DIR: &str = PROXMOX_BACKUP_VAR_RUN_DIR_M!(); pub const PROXMOX_BACKUP_LOG_DIR: &str = PROXMOX_BACKUP_LOG_DIR_M!(); pub const PROXMOX_BACKUP_TASK_DIR: &str = PROXMOX_BACKUP_TASK_DIR_M!(); pub const PROXMOX_BACKUP_TASK_LOCK_FN: &str = concat!(PROXMOX_BACKUP_TASK_DIR_M!(), "/.active.lock"); @@ -24,16 +27,14 @@ pub const PROXMOX_BACKUP_ACTIVE_TASK_FN: &str = concat!(PROXMOX_BACKUP_TASK_DIR_ lazy_static! { static ref WORKER_TASK_LIST: Mutex>> = Mutex::new(HashMap::new()); + + static ref MY_PID: i32 = unsafe { libc::getpid() }; + static ref MY_PID_PSTART: u64 = tools::procfs::read_proc_pid_stat(*MY_PID).unwrap().starttime; } /// Test if the task is still running pub fn worker_is_active(upid: &UPID) -> bool { - lazy_static! { - static ref MY_PID: i32 = unsafe { libc::getpid() }; - static ref MY_PID_PSTART: u64 = tools::procfs::read_proc_pid_stat(*MY_PID).unwrap().starttime; - } - if (upid.pid == *MY_PID) && (upid.pstart == *MY_PID_PSTART) { if WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id) { true @@ -48,6 +49,45 @@ pub fn worker_is_active(upid: &UPID) -> bool { } } +pub fn create_task_control_socket() -> Result<(), Error> { + + let socketname = format!( + "{}/proxmox-task-control-{}.sock", PROXMOX_BACKUP_VAR_RUN_DIR, *MY_PID); + + let control_future = super::create_control_socket(socketname, true, |param| { + let param = param.as_object() + .ok_or(format_err!("unable to parse parameters (expected json object)"))?; + if param.keys().count() != 2 { bail!("worng number of parameters"); } + + let command = param.get("command") + .ok_or(format_err!("unable to parse parameters (missing command)"))?; + + // this is the only command for now + if command != "abort-task" { bail!("got unknown command '{}'", command); } + + let upid_str = param["upid"].as_str() + .ok_or(format_err!("unable to parse parameters (missing upid)"))?; + + let upid = upid_str.parse::()?; + + if !((upid.pid == *MY_PID) && (upid.pstart == *MY_PID_PSTART)) { + bail!("upid does not belong to this process"); + } + + let hash = WORKER_TASK_LIST.lock().unwrap(); + if let Some(ref worker) = hash.get(&upid.task_id) { + worker.request_abort(); + } else { + // assume task is already stopped + } + Ok(Value::Null) + })?; + + tokio::spawn(control_future); + + Ok(()) +} + fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<(i64, String)>), Error> { let data = line.splitn(3, ' ').collect::>(); @@ -65,7 +105,7 @@ fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<(i64, St } /// Create task log directory with correct permissions -pub fn create_task_log_dir() -> Result<(), Error> { +pub fn create_task_log_dirs() -> Result<(), Error> { try_block!({ let (backup_uid, backup_gid) = tools::getpwnam_ugid("backup")?; @@ -74,7 +114,7 @@ pub fn create_task_log_dir() -> Result<(), Error> { tools::create_dir_chown(PROXMOX_BACKUP_LOG_DIR, None, uid, gid)?; tools::create_dir_chown(PROXMOX_BACKUP_TASK_DIR, None, uid, gid)?; - + tools::create_dir_chown(PROXMOX_BACKUP_VAR_RUN_DIR, None, uid, gid)?; Ok(()) }).map_err(|err: Error| format_err!("unable to create task log dir - {}", err))?; @@ -402,7 +442,7 @@ impl WorkerTask { } /// Request abort - pub fn request_abort(self) { + pub fn request_abort(&self) { self.abort_requested.store(true, Ordering::SeqCst); } -- 2.39.2