use anyhow::{bail, format_err, Error};
use futures::*;
use lazy_static::lazy_static;
-use nix::unistd::Pid;
use serde_json::{json, Value};
use serde::{Serialize, Deserialize};
use tokio::sync::oneshot;
use super::UPID;
+use crate::buildcfg;
+use crate::server;
use crate::tools::logrotate::{LogRotate, LogRotateFiles};
use crate::tools::{FileLogger, FileLogOptions};
-use crate::api2::types::Userid;
+use crate::api2::types::{Authid, TaskStateType};
-macro_rules! PROXMOX_BACKUP_VAR_RUN_DIR_M { () => ("/run/proxmox-backup") }
-macro_rules! PROXMOX_BACKUP_LOG_DIR_M { () => ("/var/log/proxmox-backup") }
-macro_rules! PROXMOX_BACKUP_TASK_DIR_M { () => (concat!( PROXMOX_BACKUP_LOG_DIR_M!(), "/tasks")) }
-
-pub const PROXMOX_BACKUP_VAR_RUN_DIR: &str = PROXMOX_BACKUP_VAR_RUN_DIR_M!();
-pub const PROXMOX_BACKUP_LOG_DIR: &str = PROXMOX_BACKUP_LOG_DIR_M!();
-pub const PROXMOX_BACKUP_TASK_DIR: &str = PROXMOX_BACKUP_TASK_DIR_M!();
-pub const PROXMOX_BACKUP_TASK_LOCK_FN: &str = concat!(PROXMOX_BACKUP_TASK_DIR_M!(), "/.active.lock");
-pub const PROXMOX_BACKUP_ACTIVE_TASK_FN: &str = concat!(PROXMOX_BACKUP_TASK_DIR_M!(), "/active");
-pub const PROXMOX_BACKUP_INDEX_TASK_FN: &str = concat!(PROXMOX_BACKUP_TASK_DIR_M!(), "/index");
-pub const PROXMOX_BACKUP_ARCHIVE_TASK_FN: &str = concat!(PROXMOX_BACKUP_TASK_DIR_M!(), "/archive");
-
-const MAX_INDEX_TASKS: usize = 1000;
+macro_rules! taskdir {
+ ($subdir:expr) => (concat!(PROXMOX_BACKUP_LOG_DIR_M!(), "/tasks", $subdir))
+}
+pub const PROXMOX_BACKUP_TASK_DIR: &str = taskdir!("/");
+pub const PROXMOX_BACKUP_TASK_LOCK_FN: &str = taskdir!("/.active.lock");
+pub const PROXMOX_BACKUP_ACTIVE_TASK_FN: &str = taskdir!("/active");
+pub const PROXMOX_BACKUP_INDEX_TASK_FN: &str = taskdir!("/index");
+pub const PROXMOX_BACKUP_ARCHIVE_TASK_FN: &str = taskdir!("/archive");
lazy_static! {
static ref WORKER_TASK_LIST: Mutex<HashMap<usize, Arc<WorkerTask>>> = Mutex::new(HashMap::new());
+}
- static ref MY_PID: i32 = unsafe { libc::getpid() };
- static ref MY_PID_PSTART: u64 = procfs::PidStat::read_from_pid(Pid::from_raw(*MY_PID))
- .unwrap()
- .starttime;
+/// checks if the task UPID refers to a worker from this process
+fn is_local_worker(upid: &UPID) -> bool {
+ upid.pid == server::pid() && upid.pstart == server::pstart()
}
/// Test if the task is still running
pub async fn worker_is_active(upid: &UPID) -> Result<bool, Error> {
- if (upid.pid == *MY_PID) && (upid.pstart == *MY_PID_PSTART) {
+ if is_local_worker(upid) {
return Ok(WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id));
}
- if !procfs::check_process_running_pstart(upid.pid, upid.pstart).is_some() {
+ if procfs::check_process_running_pstart(upid.pid, upid.pstart).is_none() {
return Ok(false);
}
- let socketname = format!(
- "\0{}/proxmox-task-control-{}.sock", PROXMOX_BACKUP_VAR_RUN_DIR, upid.pid);
-
+ let sock = server::ctrl_sock_from_pid(upid.pid);
let cmd = json!({
- "command": "status",
- "upid": upid.to_string(),
+ "command": "worker-task-status",
+ "args": {
+ "upid": upid.to_string(),
+ },
});
-
- let status = super::send_command(socketname, cmd).await?;
+ let status = super::send_command(sock, cmd).await?;
if let Some(active) = status.as_bool() {
Ok(active)
/// Test if the task is still running (fast but inaccurate implementation)
///
-/// If the task is spanned from a different process, we simply return if
+/// If the task is spawned from a different process, we simply return if
/// that process is still running. This information is good enough to detect
/// stale tasks...
pub fn worker_is_active_local(upid: &UPID) -> bool {
- if (upid.pid == *MY_PID) && (upid.pstart == *MY_PID_PSTART) {
+ if is_local_worker(upid) {
WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id)
} else {
procfs::check_process_running_pstart(upid.pid, upid.pstart).is_some()
}
}
-pub fn create_task_control_socket() -> Result<(), Error> {
-
- let socketname = format!(
- "\0{}/proxmox-task-control-{}.sock", PROXMOX_BACKUP_VAR_RUN_DIR, *MY_PID);
-
- let control_future = super::create_control_socket(socketname, |param| {
- let param = param
- .as_object()
- .ok_or_else(|| format_err!("unable to parse parameters (expected json object)"))?;
- if param.keys().count() != 2 { bail!("wrong number of parameters"); }
-
- let command = param["command"]
- .as_str()
- .ok_or_else(|| format_err!("unable to parse parameters (missing command)"))?;
-
- // we have only two commands for now
- if !(command == "abort-task" || command == "status") {
- bail!("got unknown command '{}'", command);
- }
-
- let upid_str = param["upid"]
- .as_str()
- .ok_or_else(|| format_err!("unable to parse parameters (missing upid)"))?;
-
- let upid = upid_str.parse::<UPID>()?;
-
- if !(upid.pid == *MY_PID && upid.pstart == *MY_PID_PSTART) {
+pub fn register_task_control_commands(
+ commando_sock: &mut super::CommandoSocket,
+) -> Result<(), Error> {
+ fn get_upid(args: Option<&Value>) -> Result<UPID, Error> {
+ let args = if let Some(args) = args { args } else { bail!("missing args") };
+ let upid = match args.get("upid") {
+ Some(Value::String(upid)) => upid.parse::<UPID>()?,
+ None => bail!("no upid in args"),
+ _ => bail!("unable to parse upid"),
+ };
+ if !is_local_worker(&upid) {
bail!("upid does not belong to this process");
}
+ Ok(upid)
+ }
- let hash = WORKER_TASK_LIST.lock().unwrap();
+ commando_sock.register_command("worker-task-abort".into(), move |args| {
+ let upid = get_upid(args)?;
- match command {
- "abort-task" => {
- if let Some(ref worker) = hash.get(&upid.task_id) {
- worker.request_abort();
- } else {
- // assume task is already stopped
- }
- Ok(Value::Null)
- }
- "status" => {
- let active = hash.contains_key(&upid.task_id);
- Ok(active.into())
- }
- _ => {
- bail!("got unknown command '{}'", command);
- }
+ if let Some(ref worker) = WORKER_TASK_LIST.lock().unwrap().get(&upid.task_id) {
+ worker.request_abort();
}
+ Ok(Value::Null)
})?;
+ commando_sock.register_command("worker-task-status".into(), move |args| {
+ let upid = get_upid(args)?;
+
+ let active = WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id);
- tokio::spawn(control_future);
+ Ok(active.into())
+ })?;
Ok(())
}
pub async fn abort_worker(upid: UPID) -> Result<(), Error> {
- let target_pid = upid.pid;
-
- let socketname = format!(
- "\0{}/proxmox-task-control-{}.sock", PROXMOX_BACKUP_VAR_RUN_DIR, target_pid);
-
+ let sock = server::ctrl_sock_from_pid(upid.pid);
let cmd = json!({
- "command": "abort-task",
- "upid": upid.to_string(),
+ "command": "worker-task-abort",
+ "args": {
+ "upid": upid.to_string(),
+ },
});
-
- super::send_command(socketname, cmd).map_ok(|_| ()).await
+ super::send_command(sock, cmd).map_ok(|_| ()).await
}
fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<TaskState>), Error> {
.owner(backup_user.uid)
.group(backup_user.gid);
- create_path(PROXMOX_BACKUP_LOG_DIR, None, Some(opts.clone()))?;
+ create_path(buildcfg::PROXMOX_BACKUP_LOG_DIR, None, Some(opts.clone()))?;
create_path(PROXMOX_BACKUP_TASK_DIR, None, Some(opts.clone()))?;
- create_path(PROXMOX_BACKUP_VAR_RUN_DIR, None, Some(opts))?;
+ create_path(buildcfg::PROXMOX_BACKUP_RUN_DIR, None, Some(opts))?;
Ok(())
}).map_err(|err: Error| format_err!("unable to create task log dir - {}", err))?;
file.read_to_end(&mut data)?;
// task logs should end with newline, we do not want it here
- if data.len() > 0 && data[data.len()-1] == b'\n' {
+ if !data.is_empty() && data[data.len()-1] == b'\n' {
data.pop();
}
}
}
+ pub fn tasktype(&self) -> TaskStateType {
+ match self {
+ TaskState::OK { .. } => TaskStateType::OK,
+ TaskState::Unknown { .. } => TaskStateType::Unknown,
+ TaskState::Error { .. } => TaskStateType::Error,
+ TaskState::Warning { .. } => TaskStateType::Warning,
+ }
+ }
+
fn result_text(&self) -> String {
match self {
TaskState::Error { message, .. } => format!("TASK ERROR: {}", message),
Ok(TaskState::Unknown { endtime })
} else if s == "OK" {
Ok(TaskState::OK { endtime })
- } else if s.starts_with("WARNINGS: ") {
- let count: u64 = s[10..].parse()?;
+ } else if let Some(warnings) = s.strip_prefix("WARNINGS: ") {
+ let count: u64 = warnings.parse()?;
Ok(TaskState::Warning{ count, endtime })
- } else if s.len() > 0 {
- let message = if s.starts_with("ERROR: ") { &s[7..] } else { s }.to_string();
+ } else if !s.is_empty() {
+ let message = if let Some(err) = s.strip_prefix("ERROR: ") { err } else { s }.to_string();
Ok(TaskState::Error{ message, endtime })
} else {
bail!("unable to parse Task Status '{}'", s);
let lock = lock_task_list_files(true)?;
+ // TODO remove with 1.x
let mut finish_list: Vec<TaskListInfo> = read_task_file_from_path(PROXMOX_BACKUP_INDEX_TASK_FN)?;
+ let had_index_file = !finish_list.is_empty();
+
+ // We use filter_map because one negative case wants to *move* the data into `finish_list`,
+ // clippy doesn't quite catch this!
+ #[allow(clippy::unnecessary_filter_map)]
let mut active_list: Vec<TaskListInfo> = read_task_file_from_path(PROXMOX_BACKUP_ACTIVE_TASK_FN)?
.into_iter()
.filter_map(|info| {
}
if !worker_is_active_local(&info.upid) {
- println!("Detected stopped UPID {}", &info.upid_str);
+ // println!("Detected stopped task '{}'", &info.upid_str);
let now = proxmox::tools::time::epoch_i64();
- let status = upid_read_status(&info.upid)
- .unwrap_or_else(|_| TaskState::Unknown { endtime: now });
+ let status = upid_read_status(&info.upid).unwrap_or(TaskState::Unknown { endtime: now });
finish_list.push(TaskListInfo {
upid: info.upid,
upid_str: info.upid_str,
}
});
-
- let start = if finish_list.len() > MAX_INDEX_TASKS {
- finish_list.len() - MAX_INDEX_TASKS
- } else {
- 0
- };
-
- let end = (start+MAX_INDEX_TASKS).min(finish_list.len());
-
- let index_raw = if end > start {
- render_task_list(&finish_list[start..end])
- } else {
- "".to_string()
- };
-
- replace_file(
- PROXMOX_BACKUP_INDEX_TASK_FN,
- index_raw.as_bytes(),
- CreateOptions::new()
- .owner(backup_user.uid)
- .group(backup_user.gid),
- )?;
-
- if !finish_list.is_empty() && start > 0 {
+ if !finish_list.is_empty() {
match std::fs::OpenOptions::new().append(true).create(true).open(PROXMOX_BACKUP_ARCHIVE_TASK_FN) {
Ok(mut writer) => {
- for info in &finish_list[0..start] {
+ for info in &finish_list {
writer.write_all(render_task_line(&info).as_bytes())?;
}
},
nix::unistd::chown(PROXMOX_BACKUP_ARCHIVE_TASK_FN, Some(backup_user.uid), Some(backup_user.gid))?;
}
+ // TODO Remove with 1.x
+ // for compatibility, if we had an INDEX file, we do not need it anymore
+ if had_index_file {
+ let _ = nix::unistd::unlink(PROXMOX_BACKUP_INDEX_TASK_FN);
+ }
+
drop(lock);
Ok(())
read_task_file(file)
}
-enum TaskFile {
- Active,
- Index,
- Archive,
- End,
-}
-
pub struct TaskListInfoIterator {
list: VecDeque<TaskListInfo>,
- file: TaskFile,
+ end: bool,
archive: Option<LogRotateFiles>,
lock: Option<File>,
}
.iter()
.any(|info| info.state.is_some() || !worker_is_active_local(&info.upid));
- if needs_update {
+ // TODO remove with 1.x
+ let index_exists = std::path::Path::new(PROXMOX_BACKUP_INDEX_TASK_FN).is_file();
+
+ if needs_update || index_exists {
drop(lock);
update_active_workers(None)?;
let lock = lock_task_list_files(false)?;
Some(logrotate.files())
};
- let file = if active_only { TaskFile::End } else { TaskFile::Active };
let lock = if active_only { None } else { Some(read_lock) };
Ok(Self {
list: active_list.into(),
- file,
+ end: active_only,
archive,
lock,
})
loop {
if let Some(element) = self.list.pop_back() {
return Some(Ok(element));
+ } else if self.end {
+ return None;
} else {
- match self.file {
- TaskFile::Active => {
- let index = match read_task_file_from_path(PROXMOX_BACKUP_INDEX_TASK_FN) {
- Ok(index) => index,
+ if let Some(mut archive) = self.archive.take() {
+ if let Some(file) = archive.next() {
+ let list = match read_task_file(file) {
+ Ok(list) => list,
Err(err) => return Some(Err(err)),
};
- self.list.append(&mut index.into());
- self.file = TaskFile::Index;
- },
- TaskFile::Index | TaskFile::Archive => {
- if let Some(mut archive) = self.archive.take() {
- if let Some(file) = archive.next() {
- let list = match read_task_file(file) {
- Ok(list) => list,
- Err(err) => return Some(Err(err)),
- };
- self.list.append(&mut list.into());
- self.archive = Some(archive);
- self.file = TaskFile::Archive;
- continue;
- }
- }
- self.file = TaskFile::End;
- self.lock.take();
- return None;
+ self.list.append(&mut list.into());
+ self.archive = Some(archive);
+ continue;
}
- TaskFile::End => return None,
}
+
+ self.end = true;
+ self.lock.take();
}
}
}
pub abort_listeners: Vec<oneshot::Sender<()>>,
}
-impl Drop for WorkerTask {
-
- fn drop(&mut self) {
- println!("unregister worker");
- }
-}
-
impl WorkerTask {
- pub fn new(worker_type: &str, worker_id: Option<String>, userid: Userid, to_stdout: bool) -> Result<Arc<Self>, Error> {
- println!("register worker");
-
- let upid = UPID::new(worker_type, worker_id, userid)?;
+ pub fn new(worker_type: &str, worker_id: Option<String>, auth_id: Authid, to_stdout: bool) -> Result<Arc<Self>, Error> {
+ let upid = UPID::new(worker_type, worker_id, auth_id)?;
let task_id = upid.task_id;
let mut path = std::path::PathBuf::from(PROXMOX_BACKUP_TASK_DIR);
- path.push(format!("{:02X}", upid.pstart % 256));
+ path.push(format!("{:02X}", upid.pstart & 255));
let backup_user = crate::backup::backup_user()?;
path.push(upid.to_string());
- println!("FILE: {:?}", path);
-
let logger_options = FileLogOptions {
- to_stdout: to_stdout,
+ to_stdout,
exclusive: true,
prefix_time: true,
read: true,
pub fn spawn<F, T>(
worker_type: &str,
worker_id: Option<String>,
- userid: Userid,
+ auth_id: Authid,
to_stdout: bool,
f: F,
) -> Result<String, Error>
where F: Send + 'static + FnOnce(Arc<WorkerTask>) -> T,
T: Send + 'static + Future<Output = Result<(), Error>>,
{
- let worker = WorkerTask::new(worker_type, worker_id, userid, to_stdout)?;
+ let worker = WorkerTask::new(worker_type, worker_id, auth_id, to_stdout)?;
let upid_str = worker.upid.to_string();
let f = f(worker.clone());
tokio::spawn(async move {
pub fn new_thread<F>(
worker_type: &str,
worker_id: Option<String>,
- userid: Userid,
+ auth_id: Authid,
to_stdout: bool,
f: F,
) -> Result<String, Error>
where F: Send + UnwindSafe + 'static + FnOnce(Arc<WorkerTask>) -> Result<(), Error>
{
- println!("register worker thread");
-
- let worker = WorkerTask::new(worker_type, worker_id, userid, to_stdout)?;
+ let worker = WorkerTask::new(worker_type, worker_id, auth_id, to_stdout)?;
let upid_str = worker.upid.to_string();
let _child = std::thread::Builder::new().name(upid_str.clone()).spawn(move || {