use anyhow::{bail, format_err, Error};
use futures::*;
use lazy_static::lazy_static;
-use nix::unistd::Pid;
use serde_json::{json, Value};
use serde::{Serialize, Deserialize};
use tokio::sync::oneshot;
use super::UPID;
use crate::buildcfg;
+use crate::server;
use crate::tools::logrotate::{LogRotate, LogRotateFiles};
use crate::tools::{FileLogger, FileLogOptions};
-use crate::api2::types::Authid;
+use crate::api2::types::{Authid, TaskStateType};
macro_rules! taskdir {
($subdir:expr) => (concat!(PROXMOX_BACKUP_LOG_DIR_M!(), "/tasks", $subdir))
lazy_static! {
static ref WORKER_TASK_LIST: Mutex<HashMap<usize, Arc<WorkerTask>>> = Mutex::new(HashMap::new());
+}
- static ref MY_PID: i32 = unsafe { libc::getpid() };
- static ref MY_PID_PSTART: u64 = procfs::PidStat::read_from_pid(Pid::from_raw(*MY_PID))
- .unwrap()
- .starttime;
+/// checks if the task UPID refers to a worker from this process
+fn is_local_worker(upid: &UPID) -> bool {
+ upid.pid == server::pid() && upid.pstart == server::pstart()
}
/// Test if the task is still running
pub async fn worker_is_active(upid: &UPID) -> Result<bool, Error> {
- if (upid.pid == *MY_PID) && (upid.pstart == *MY_PID_PSTART) {
+ if is_local_worker(upid) {
return Ok(WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id));
}
- if !procfs::check_process_running_pstart(upid.pid, upid.pstart).is_some() {
+ if procfs::check_process_running_pstart(upid.pid, upid.pstart).is_none() {
return Ok(false);
}
- let socketname = format!(
- "\0{}/proxmox-task-control-{}.sock", buildcfg::PROXMOX_BACKUP_RUN_DIR, upid.pid);
-
+ let sock = server::ctrl_sock_from_pid(upid.pid);
let cmd = json!({
- "command": "status",
- "upid": upid.to_string(),
+ "command": "worker-task-status",
+ "args": {
+ "upid": upid.to_string(),
+ },
});
-
- let status = super::send_command(socketname, cmd).await?;
+ let status = super::send_command(sock, cmd).await?;
if let Some(active) = status.as_bool() {
Ok(active)
/// Test if the task is still running (fast but inaccurate implementation)
///
-/// If the task is spanned from a different process, we simply return if
+/// If the task is spawned from a different process, we simply return if
/// that process is still running. This information is good enough to detect
/// stale tasks...
pub fn worker_is_active_local(upid: &UPID) -> bool {
- if (upid.pid == *MY_PID) && (upid.pstart == *MY_PID_PSTART) {
+ if is_local_worker(upid) {
WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id)
} else {
procfs::check_process_running_pstart(upid.pid, upid.pstart).is_some()
}
}
-pub fn create_task_control_socket() -> Result<(), Error> {
-
- let socketname = format!(
- "\0{}/proxmox-task-control-{}.sock", buildcfg::PROXMOX_BACKUP_RUN_DIR, *MY_PID);
-
- let control_future = super::create_control_socket(socketname, |param| {
- let param = param
- .as_object()
- .ok_or_else(|| format_err!("unable to parse parameters (expected json object)"))?;
- if param.keys().count() != 2 { bail!("wrong number of parameters"); }
-
- let command = param["command"]
- .as_str()
- .ok_or_else(|| format_err!("unable to parse parameters (missing command)"))?;
-
- // we have only two commands for now
- if !(command == "abort-task" || command == "status") {
- bail!("got unknown command '{}'", command);
- }
-
- let upid_str = param["upid"]
- .as_str()
- .ok_or_else(|| format_err!("unable to parse parameters (missing upid)"))?;
-
- let upid = upid_str.parse::<UPID>()?;
-
- if !(upid.pid == *MY_PID && upid.pstart == *MY_PID_PSTART) {
+pub fn register_task_control_commands(
+ commando_sock: &mut super::CommandoSocket,
+) -> Result<(), Error> {
+ fn get_upid(args: Option<&Value>) -> Result<UPID, Error> {
+ let args = if let Some(args) = args { args } else { bail!("missing args") };
+ let upid = match args.get("upid") {
+ Some(Value::String(upid)) => upid.parse::<UPID>()?,
+ None => bail!("no upid in args"),
+ _ => bail!("unable to parse upid"),
+ };
+ if !is_local_worker(&upid) {
bail!("upid does not belong to this process");
}
+ Ok(upid)
+ }
- let hash = WORKER_TASK_LIST.lock().unwrap();
+ commando_sock.register_command("worker-task-abort".into(), move |args| {
+ let upid = get_upid(args)?;
- match command {
- "abort-task" => {
- if let Some(ref worker) = hash.get(&upid.task_id) {
- worker.request_abort();
- } else {
- // assume task is already stopped
- }
- Ok(Value::Null)
- }
- "status" => {
- let active = hash.contains_key(&upid.task_id);
- Ok(active.into())
- }
- _ => {
- bail!("got unknown command '{}'", command);
- }
+ if let Some(ref worker) = WORKER_TASK_LIST.lock().unwrap().get(&upid.task_id) {
+ worker.request_abort();
}
+ Ok(Value::Null)
})?;
+ commando_sock.register_command("worker-task-status".into(), move |args| {
+ let upid = get_upid(args)?;
- tokio::spawn(control_future);
+ let active = WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id);
+
+ Ok(active.into())
+ })?;
Ok(())
}
pub async fn abort_worker(upid: UPID) -> Result<(), Error> {
- let target_pid = upid.pid;
-
- let socketname = format!(
- "\0{}/proxmox-task-control-{}.sock", buildcfg::PROXMOX_BACKUP_RUN_DIR, target_pid);
-
+ let sock = server::ctrl_sock_from_pid(upid.pid);
let cmd = json!({
- "command": "abort-task",
- "upid": upid.to_string(),
+ "command": "worker-task-abort",
+ "args": {
+ "upid": upid.to_string(),
+ },
});
-
- super::send_command(socketname, cmd).map_ok(|_| ()).await
+ super::send_command(sock, cmd).map_ok(|_| ()).await
}
fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<TaskState>), Error> {
file.read_to_end(&mut data)?;
// task logs should end with newline, we do not want it here
- if data.len() > 0 && data[data.len()-1] == b'\n' {
+ if !data.is_empty() && data[data.len()-1] == b'\n' {
data.pop();
}
}
}
+ pub fn tasktype(&self) -> TaskStateType {
+ match self {
+ TaskState::OK { .. } => TaskStateType::OK,
+ TaskState::Unknown { .. } => TaskStateType::Unknown,
+ TaskState::Error { .. } => TaskStateType::Error,
+ TaskState::Warning { .. } => TaskStateType::Warning,
+ }
+ }
+
fn result_text(&self) -> String {
match self {
TaskState::Error { message, .. } => format!("TASK ERROR: {}", message),
Ok(TaskState::Unknown { endtime })
} else if s == "OK" {
Ok(TaskState::OK { endtime })
- } else if s.starts_with("WARNINGS: ") {
- let count: u64 = s[10..].parse()?;
+ } else if let Some(warnings) = s.strip_prefix("WARNINGS: ") {
+ let count: u64 = warnings.parse()?;
Ok(TaskState::Warning{ count, endtime })
- } else if s.len() > 0 {
- let message = if s.starts_with("ERROR: ") { &s[7..] } else { s }.to_string();
+ } else if !s.is_empty() {
+ let message = if let Some(err) = s.strip_prefix("ERROR: ") { err } else { s }.to_string();
Ok(TaskState::Error{ message, endtime })
} else {
bail!("unable to parse Task Status '{}'", s);
let mut finish_list: Vec<TaskListInfo> = read_task_file_from_path(PROXMOX_BACKUP_INDEX_TASK_FN)?;
let had_index_file = !finish_list.is_empty();
+ // We use filter_map because one negative case wants to *move* the data into `finish_list`,
+ // clippy doesn't quite catch this!
+ #[allow(clippy::unnecessary_filter_map)]
let mut active_list: Vec<TaskListInfo> = read_task_file_from_path(PROXMOX_BACKUP_ACTIVE_TASK_FN)?
.into_iter()
.filter_map(|info| {
}
if !worker_is_active_local(&info.upid) {
- println!("Detected stopped task '{}'", &info.upid_str);
+ // println!("Detected stopped task '{}'", &info.upid_str);
let now = proxmox::tools::time::epoch_i64();
- let status = upid_read_status(&info.upid)
- .unwrap_or_else(|_| TaskState::Unknown { endtime: now });
+ let status = upid_read_status(&info.upid).unwrap_or(TaskState::Unknown { endtime: now });
finish_list.push(TaskListInfo {
upid: info.upid,
upid_str: info.upid_str,
path.push(upid.to_string());
let logger_options = FileLogOptions {
- to_stdout: to_stdout,
+ to_stdout,
exclusive: true,
prefix_time: true,
read: true,