pub const PROXMOX_BACKUP_TASK_DIR: &str = PROXMOX_BACKUP_TASK_DIR_M!();
pub const PROXMOX_BACKUP_TASK_LOCK_FN: &str = concat!(PROXMOX_BACKUP_TASK_DIR_M!(), "/.active.lock");
pub const PROXMOX_BACKUP_ACTIVE_TASK_FN: &str = concat!(PROXMOX_BACKUP_TASK_DIR_M!(), "/active");
+pub const PROXMOX_BACKUP_INDEX_TASK_FN: &str = concat!(PROXMOX_BACKUP_TASK_DIR_M!(), "/index");
+
+const MAX_INDEX_TASKS: usize = 1000;
lazy_static! {
static ref WORKER_TASK_LIST: Mutex<HashMap<usize, Arc<WorkerTask>>> = Mutex::new(HashMap::new());
let lock = lock_task_list_files(true)?;
- let reader = match File::open(PROXMOX_BACKUP_ACTIVE_TASK_FN) {
- Ok(f) => Some(BufReader::new(f)),
- Err(err) => {
- if err.kind() == std::io::ErrorKind::NotFound {
- None
- } else {
- bail!("unable to open active worker {:?} - {}", PROXMOX_BACKUP_ACTIVE_TASK_FN, err);
+ let mut finish_list: Vec<TaskListInfo> = read_task_file_from_path(PROXMOX_BACKUP_INDEX_TASK_FN)?;
+ let mut active_list: Vec<TaskListInfo> = read_task_file_from_path(PROXMOX_BACKUP_ACTIVE_TASK_FN)?
+ .into_iter()
+ .filter_map(|info| {
+ if info.state.is_some() {
+ // this can happen when the active file still includes finished tasks
+ finish_list.push(info);
+ return None;
}
- }
- };
- let mut active_list = vec![];
- let mut finish_list = vec![];
-
- if let Some(lines) = reader.map(|r| r.lines()) {
-
- for line in lines {
- let line = line?;
- match parse_worker_status_line(&line) {
- Err(err) => bail!("unable to parse active worker status '{}' - {}", line, err),
- Ok((upid_str, upid, state)) => match state {
- None if worker_is_active_local(&upid) => {
- active_list.push(TaskListInfo { upid, upid_str, state: None });
- },
- None => {
- println!("Detected stopped UPID {}", upid_str);
- let now = proxmox::tools::time::epoch_i64();
- let status = upid_read_status(&upid)
- .unwrap_or_else(|_| TaskState::Unknown { endtime: now });
- finish_list.push(TaskListInfo {
- upid, upid_str, state: Some(status)
- });
- },
- Some(status) => {
- finish_list.push(TaskListInfo {
- upid, upid_str, state: Some(status)
- })
- }
- }
+ if !worker_is_active_local(&info.upid) {
+ println!("Detected stopped UPID {}", &info.upid_str);
+ let now = proxmox::tools::time::epoch_i64();
+ let status = upid_read_status(&info.upid)
+ .unwrap_or_else(|_| TaskState::Unknown { endtime: now });
+ finish_list.push(TaskListInfo {
+ upid: info.upid,
+ upid_str: info.upid_str,
+ state: Some(status)
+ });
+ return None;
}
- }
- }
+
+ Some(info)
+ }).collect();
if let Some(upid) = new_upid {
active_list.push(TaskListInfo { upid: upid.clone(), upid_str: upid.to_string(), state: None });
}
- // assemble list without duplicates
- // we include all active tasks,
- // and fill up to 1000 entries with finished tasks
+ let active_raw = render_task_list(&active_list);
- let max = 1000;
-
- let mut task_hash = HashMap::new();
-
- for info in active_list {
- task_hash.insert(info.upid_str.clone(), info);
- }
-
- for info in finish_list {
- if task_hash.len() > max { break; }
- if !task_hash.contains_key(&info.upid_str) {
- task_hash.insert(info.upid_str.clone(), info);
- }
- }
-
- let mut task_list: Vec<TaskListInfo> = vec![];
- for (_, info) in task_hash { task_list.push(info); }
+ replace_file(
+ PROXMOX_BACKUP_ACTIVE_TASK_FN,
+ active_raw.as_bytes(),
+ CreateOptions::new()
+ .owner(backup_user.uid)
+ .group(backup_user.gid),
+ )?;
- task_list.sort_unstable_by(|b, a| { // lastest on top
+ finish_list.sort_unstable_by(|a, b| {
match (&a.state, &b.state) {
(Some(s1), Some(s2)) => s1.cmp(&s2),
(Some(_), None) => std::cmp::Ordering::Less,
}
});
- let raw = render_task_list(&task_list[..]);
+ let start = (finish_list.len()-MAX_INDEX_TASKS).max(0);
+ let end = (start+MAX_INDEX_TASKS).min(finish_list.len());
+ let index_raw = render_task_list(&finish_list[start..end]);
replace_file(
- PROXMOX_BACKUP_ACTIVE_TASK_FN,
- raw.as_bytes(),
+ PROXMOX_BACKUP_INDEX_TASK_FN,
+ index_raw.as_bytes(),
CreateOptions::new()
.owner(backup_user.uid)
.group(backup_user.gid),
drop(lock);
- Ok(task_list)
+ finish_list.append(&mut active_list);
+ finish_list.reverse();
+ Ok(finish_list)
}
/// Returns a sorted list of known tasks
raw
}
+// note this is not locked, caller has to make sure it is
+// this will skip (and log) lines that are not valid status lines
+fn read_task_file<R: Read>(reader: R) -> Result<Vec<TaskListInfo>, Error>
+{
+ let reader = BufReader::new(reader);
+ let mut list = Vec::new();
+ for line in reader.lines() {
+ let line = line?;
+ match parse_worker_status_line(&line) {
+ Ok((upid_str, upid, state)) => list.push(TaskListInfo {
+ upid_str,
+ upid,
+ state
+ }),
+ Err(err) => {
+ eprintln!("unable to parse worker status '{}' - {}", line, err);
+ continue;
+ }
+ };
+ }
+
+ Ok(list)
+}
+
+// note this is not locked, caller has to make sure it is
+fn read_task_file_from_path<P>(path: P) -> Result<Vec<TaskListInfo>, Error>
+where
+ P: AsRef<std::path::Path> + std::fmt::Debug,
+{
+ let file = match File::open(&path) {
+ Ok(f) => f,
+ Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
+ Err(err) => bail!("unable to open task list {:?} - {}", path, err),
+ };
+
+ read_task_file(file)
+}
+
/// Launch long running worker tasks.
///
/// A worker task can either be a whole thread, or a simply tokio