use std::panic::UnwindSafe;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
+use std::time::{SystemTime, Duration};
use anyhow::{bail, format_err, Error};
use futures::*;
use once_cell::sync::OnceCell;
use proxmox::sys::linux::procfs;
-use proxmox::try_block;
use proxmox::tools::fs::{create_path, replace_file, atomic_open_or_create_file, CreateOptions};
-use proxmox::api::upid::UPID;
+use proxmox_lang::try_block;
+use proxmox_schema::upid::UPID;
use pbs_tools::task::WorkerTaskContext;
use pbs_tools::logrotate::{LogRotate, LogRotateFiles};
-use crate::{CommandoSocket, FileLogger, FileLogOptions};
+use crate::{CommandSocket, FileLogger, FileLogOptions};
struct TaskListLockGuard(File);
if !worker_is_active_local(&info.upid) {
// println!("Detected stopped task '{}'", &info.upid_str);
- let now = proxmox::tools::time::epoch_i64();
+ let now = proxmox_time::epoch_i64();
let status = upid_read_status(&info.upid).unwrap_or(TaskState::Unknown { endtime: now });
finish_list.push(TaskListInfo {
upid: info.upid,
&self.active_tasks_fn,
active_raw.as_bytes(),
options,
+ false,
)?;
finish_list.sort_unstable_by(|a, b| {
OFlag::O_APPEND | OFlag::O_RDWR,
&[],
options,
+ false,
)?;
for info in &finish_list {
writer.write_all(render_task_line(&info).as_bytes())?;
logrotate.rotate(size_threshold, None, max_files)
}
+/// removes all task logs that are older than the oldest task entry in the
+/// task archive
+pub fn cleanup_old_tasks(compressed: bool) -> Result<(), Error> {
+ let setup = worker_task_setup()?;
+
+ let _lock = setup.lock_task_list_files(true)?;
+
+ let logrotate = LogRotate::new(&setup.task_archive_fn, compressed)
+ .ok_or_else(|| format_err!("could not get archive file names"))?;
+
+ let mut timestamp = None;
+ if let Some(last_file) = logrotate.files().last() {
+ let reader = BufReader::new(last_file);
+ for line in reader.lines() {
+ let line = line?;
+ if let Ok((_, _, Some(state))) = parse_worker_status_line(&line) {
+ timestamp = Some(state.endtime());
+ break;
+ }
+ }
+ }
+
+ fn get_modified(entry: std::fs::DirEntry) -> Result<SystemTime, std::io::Error> {
+ entry.metadata()?.modified()
+ }
+
+ if let Some(timestamp) = timestamp {
+ let cutoff_time = if timestamp > 0 {
+ SystemTime::UNIX_EPOCH.checked_add(Duration::from_secs(timestamp as u64))
+ } else {
+ SystemTime::UNIX_EPOCH.checked_sub(Duration::from_secs(-timestamp as u64))
+ }.ok_or_else(|| format_err!("could not calculate cutoff time"))?;
+
+ for i in 0..256 {
+ let mut path = setup.taskdir.clone();
+ path.push(format!("{:02X}", i));
+ for file in std::fs::read_dir(path)? {
+ let file = file?;
+ let path = file.path();
+
+ let modified = get_modified(file)
+ .map_err(|err| format_err!("error getting mtime for {:?}: {}", path, err))?;
+
+ if modified < cutoff_time {
+ match std::fs::remove_file(path) {
+ Ok(()) => {},
+ Err(err) if err.kind() == std::io::ErrorKind::NotFound => {},
+ Err(err) => bail!("could not remove file: {}", err),
+ }
+ }
+ }
+ }
+ }
+
+ Ok(())
+}
+
/// Path to the worker log file
pub fn upid_log_path(upid: &UPID) -> Result<std::path::PathBuf, Error> {
let mut iter = last_line.splitn(2, ": ");
if let Some(time_str) = iter.next() {
- if let Ok(endtime) = proxmox::tools::time::parse_rfc3339(time_str) {
+ if let Ok(endtime) = proxmox_time::parse_rfc3339(time_str) {
// set the endtime even if we cannot parse the state
status = TaskState::Unknown { endtime };
if let Some(rest) = iter.next().and_then(|rest| rest.strip_prefix("TASK ")) {
}
}
+/// Register task control command on a [CommandSocket].
+///
+/// This create two commands:
+///
+/// * ``worker-task-abort <UPID>``: calls [abort_local_worker]
+///
+/// * ``worker-task-status <UPID>``: return true of false, depending on
+/// whether the worker is running or stopped.
pub fn register_task_control_commands(
- commando_sock: &mut CommandoSocket,
+ commando_sock: &mut CommandSocket,
) -> Result<(), Error> {
fn get_upid(args: Option<&Value>) -> Result<UPID, Error> {
let args = if let Some(args) = args { args } else { bail!("missing args") };
Ok(())
}
-pub fn abort_worker_async(upid: UPID) {
+/// Try to abort a worker task, but do no wait
+///
+/// Errors (if any) are simply logged.
+pub fn abort_worker_nowait(upid: UPID) {
tokio::spawn(async move {
if let Err(err) = abort_worker(upid).await {
- eprintln!("abort worker failed - {}", err);
+ log::error!("abort worker task failed - {}", err);
}
});
}
+/// Abort a worker task
+///
+/// By sending ``worker-task-abort`` to the control socket.
pub async fn abort_worker(upid: UPID) -> Result<(), Error> {
let sock = crate::ctrl_sock_from_pid(upid.pid);
state
}),
Err(err) => {
- eprintln!("unable to parse worker status '{}' - {}", line, err);
+ log::warn!("unable to parse worker status '{}' - {}", line, err);
continue;
}
};
read_task_file(file)
}
+/// Iterate over existing/active worker tasks
pub struct TaskListInfoIterator {
list: VecDeque<TaskListInfo>,
end: bool,
}
impl TaskListInfoIterator {
+ /// Creates a new iterator instance.
pub fn new(active_only: bool) -> Result<Self, Error> {
let setup = worker_task_setup()?;
pub fn create_state(&self, result: &Result<(), Error>) -> TaskState {
let warn_count = self.data.lock().unwrap().warn_count;
- let endtime = proxmox::tools::time::epoch_i64();
+ let endtime = proxmox_time::epoch_i64();
if let Err(err) = result {
TaskState::Error { message: err.to_string(), endtime }
/// Request abort
pub fn request_abort(&self) {
- eprintln!("set abort flag for worker {}", self.upid);
-
let prev_abort = self.abort_requested.swap(true, Ordering::SeqCst);
if !prev_abort { // log abort one time
self.log_message(format!("received abort request ..."));
}
}
- /// Test if abort was requested.
- pub fn abort_requested(&self) -> bool {
- self.abort_requested.load(Ordering::SeqCst)
- }
-
- /// Fail if abort was requested.
- pub fn check_abort(&self) -> Result<(), Error> {
- if self.abort_requested() {
- bail!("abort requested - aborting task");
- }
- Ok(())
- }
-
/// Get a future which resolves on task abort
pub fn abort_future(&self) -> oneshot::Receiver<()> {
let (tx, rx) = oneshot::channel::<()>();
}
impl WorkerTaskContext for WorkerTask {
- fn check_abort(&self) -> Result<(), Error> {
- self.check_abort()
+
+ fn abort_requested(&self) -> bool {
+ self.abort_requested.load(Ordering::SeqCst)
+ }
+
+ fn shutdown_requested(&self) -> bool {
+ crate::shutdown_requested()
+ }
+
+ fn fail_on_shutdown(&self) -> Result<(), Error> {
+ crate::fail_on_shutdown()
}
fn log(&self, level: log::Level, message: &std::fmt::Arguments) {