use anyhow::{bail, format_err, Error};
use futures::*;
use lazy_static::lazy_static;
-use nix::unistd::Pid;
use serde_json::{json, Value};
use serde::{Serialize, Deserialize};
use tokio::sync::oneshot;
use super::UPID;
use crate::buildcfg;
+use crate::server;
use crate::tools::logrotate::{LogRotate, LogRotateFiles};
use crate::tools::{FileLogger, FileLogOptions};
use crate::api2::types::Authid;
lazy_static! {
static ref WORKER_TASK_LIST: Mutex<HashMap<usize, Arc<WorkerTask>>> = Mutex::new(HashMap::new());
+}
- static ref MY_PID: i32 = unsafe { libc::getpid() };
- static ref MY_PID_PSTART: u64 = procfs::PidStat::read_from_pid(Pid::from_raw(*MY_PID))
- .unwrap()
- .starttime;
+/// checks if the task UPID refers to a worker from this process
+fn is_local_worker(upid: &UPID) -> bool {
+ upid.pid == server::pid() && upid.pstart == server::pstart()
}
/// Test if the task is still running
pub async fn worker_is_active(upid: &UPID) -> Result<bool, Error> {
- if (upid.pid == *MY_PID) && (upid.pstart == *MY_PID_PSTART) {
+ if is_local_worker(upid) {
return Ok(WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id));
}
return Ok(false);
}
- let socketname = format!(
- "\0{}/proxmox-task-control-{}.sock", buildcfg::PROXMOX_BACKUP_RUN_DIR, upid.pid);
-
+ let sock = server::ctrl_sock_from_pid(upid.pid);
let cmd = json!({
- "command": "status",
+ "command": "worker-task-status",
"upid": upid.to_string(),
});
-
- let status = super::send_command(socketname, cmd).await?;
+ let status = super::send_command(sock, cmd).await?;
if let Some(active) = status.as_bool() {
Ok(active)
/// Test if the task is still running (fast but inaccurate implementation)
///
-/// If the task is spanned from a different process, we simply return if
+/// If the task is spawned from a different process, we simply return if
/// that process is still running. This information is good enough to detect
/// stale tasks...
pub fn worker_is_active_local(upid: &UPID) -> bool {
- if (upid.pid == *MY_PID) && (upid.pstart == *MY_PID_PSTART) {
+ if is_local_worker(upid) {
WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id)
} else {
procfs::check_process_running_pstart(upid.pid, upid.pstart).is_some()
}
}
-pub fn create_task_control_socket() -> Result<(), Error> {
-
- let socketname = format!(
- "\0{}/proxmox-task-control-{}.sock", buildcfg::PROXMOX_BACKUP_RUN_DIR, *MY_PID);
-
- let control_future = super::create_control_socket(socketname, |param| {
- let param = param
- .as_object()
- .ok_or_else(|| format_err!("unable to parse parameters (expected json object)"))?;
- if param.keys().count() != 2 { bail!("wrong number of parameters"); }
-
- let command = param["command"]
- .as_str()
- .ok_or_else(|| format_err!("unable to parse parameters (missing command)"))?;
-
- // we have only two commands for now
- if !(command == "abort-task" || command == "status") {
- bail!("got unknown command '{}'", command);
- }
-
- let upid_str = param["upid"]
- .as_str()
- .ok_or_else(|| format_err!("unable to parse parameters (missing upid)"))?;
-
- let upid = upid_str.parse::<UPID>()?;
-
- if !(upid.pid == *MY_PID && upid.pstart == *MY_PID_PSTART) {
+pub fn register_task_control_commands(
+ commando_sock: &mut super::CommandoSocket,
+) -> Result<(), Error> {
+ fn get_upid(args: Option<&Value>) -> Result<UPID, Error> {
+ let args = if let Some(args) = args { args } else { bail!("missing args") };
+ let upid = match args.get("upid") {
+ Some(Value::String(upid)) => upid.parse::<UPID>()?,
+ None => bail!("no upid in args"),
+ _ => bail!("unable to parse upid"),
+ };
+ if !is_local_worker(&upid) {
bail!("upid does not belong to this process");
}
+ Ok(upid)
+ }
- let hash = WORKER_TASK_LIST.lock().unwrap();
+ commando_sock.register_command("worker-task-abort".into(), move |args| {
+ let upid = get_upid(args)?;
- match command {
- "abort-task" => {
- if let Some(ref worker) = hash.get(&upid.task_id) {
- worker.request_abort();
- } else {
- // assume task is already stopped
- }
- Ok(Value::Null)
- }
- "status" => {
- let active = hash.contains_key(&upid.task_id);
- Ok(active.into())
- }
- _ => {
- bail!("got unknown command '{}'", command);
- }
+ if let Some(ref worker) = WORKER_TASK_LIST.lock().unwrap().get(&upid.task_id) {
+ worker.request_abort();
}
+ Ok(Value::Null)
})?;
+ commando_sock.register_command("worker-task-status".into(), move |args| {
+ let upid = get_upid(args)?;
- tokio::spawn(control_future);
+ let active = WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id);
+
+ Ok(active.into())
+ })?;
Ok(())
}
pub async fn abort_worker(upid: UPID) -> Result<(), Error> {
- let target_pid = upid.pid;
-
- let socketname = format!(
- "\0{}/proxmox-task-control-{}.sock", buildcfg::PROXMOX_BACKUP_RUN_DIR, target_pid);
-
+ let sock = server::ctrl_sock_from_pid(upid.pid);
let cmd = json!({
- "command": "abort-task",
+ "command": "worker-task-abort",
"upid": upid.to_string(),
});
-
- super::send_command(socketname, cmd).map_ok(|_| ()).await
+ super::send_command(sock, cmd).map_ok(|_| ()).await
}
fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<TaskState>), Error> {