]> git.proxmox.com Git - proxmox-backup.git/blobdiff - proxmox-rest-server/src/worker_task.rs
use new fsync parameter to replace_file and atomic_open_or_create
[proxmox-backup.git] / proxmox-rest-server / src / worker_task.rs
index bea43573f80ced7c7245bfd870a04ed6a17575a6..242d2de23e76206ece641efcc22d3cd370706860 100644 (file)
@@ -5,6 +5,7 @@ use std::io::{Read, Write, BufRead, BufReader};
 use std::panic::UnwindSafe;
 use std::sync::atomic::{AtomicBool, Ordering};
 use std::sync::{Arc, Mutex};
+use std::time::{SystemTime, Duration};
 
 use anyhow::{bail, format_err, Error};
 use futures::*;
@@ -16,14 +17,14 @@ use nix::fcntl::OFlag;
 use once_cell::sync::OnceCell;
 
 use proxmox::sys::linux::procfs;
-use proxmox::try_block;
 use proxmox::tools::fs::{create_path, replace_file, atomic_open_or_create_file, CreateOptions};
-use proxmox::api::upid::UPID;
+use proxmox_lang::try_block;
+use proxmox_schema::upid::UPID;
 
 use pbs_tools::task::WorkerTaskContext;
 use pbs_tools::logrotate::{LogRotate, LogRotateFiles};
 
-use crate::{CommandoSocket, FileLogger, FileLogOptions};
+use crate::{CommandSocket, FileLogger, FileLogOptions};
 
 struct TaskListLockGuard(File);
 
@@ -119,7 +120,7 @@ impl WorkerTaskSetup {
 
                 if !worker_is_active_local(&info.upid) {
                     // println!("Detected stopped task '{}'", &info.upid_str);
-                    let now = proxmox::tools::time::epoch_i64();
+                    let now = proxmox_time::epoch_i64();
                     let status = upid_read_status(&info.upid).unwrap_or(TaskState::Unknown { endtime: now });
                     finish_list.push(TaskListInfo {
                         upid: info.upid,
@@ -145,6 +146,7 @@ impl WorkerTaskSetup {
             &self.active_tasks_fn,
             active_raw.as_bytes(),
             options,
+            false,
         )?;
 
         finish_list.sort_unstable_by(|a, b| {
@@ -165,6 +167,7 @@ impl WorkerTaskSetup {
                 OFlag::O_APPEND | OFlag::O_RDWR,
                 &[],
                 options,
+                false,
             )?;
             for info in &finish_list {
                 writer.write_all(render_task_line(&info).as_bytes())?;
@@ -218,6 +221,63 @@ pub fn rotate_task_log_archive(size_threshold: u64, compress: bool, max_files: O
     logrotate.rotate(size_threshold, None, max_files)
 }
 
+/// removes all task logs that are older than the oldest task entry in the
+/// task archive
+pub fn cleanup_old_tasks(compressed: bool) -> Result<(), Error> {
+    let setup = worker_task_setup()?;
+
+    let _lock = setup.lock_task_list_files(true)?;
+
+    let logrotate = LogRotate::new(&setup.task_archive_fn, compressed)
+            .ok_or_else(|| format_err!("could not get archive file names"))?;
+
+    let mut timestamp = None;
+    if let Some(last_file) = logrotate.files().last() {
+        let reader = BufReader::new(last_file);
+        for line in reader.lines() {
+            let line = line?;
+            if let Ok((_, _, Some(state))) = parse_worker_status_line(&line) {
+                timestamp = Some(state.endtime());
+                break;
+            }
+        }
+    }
+
+    fn get_modified(entry: std::fs::DirEntry) -> Result<SystemTime, std::io::Error> {
+        entry.metadata()?.modified()
+    }
+
+    if let Some(timestamp) = timestamp {
+        let cutoff_time = if timestamp > 0 {
+            SystemTime::UNIX_EPOCH.checked_add(Duration::from_secs(timestamp as u64))
+        } else {
+            SystemTime::UNIX_EPOCH.checked_sub(Duration::from_secs(-timestamp as u64))
+        }.ok_or_else(|| format_err!("could not calculate cutoff time"))?;
+
+        for i in 0..256 {
+            let mut path = setup.taskdir.clone();
+            path.push(format!("{:02X}", i));
+            for file in std::fs::read_dir(path)? {
+                let file = file?;
+                let path = file.path();
+
+                let modified = get_modified(file)
+                    .map_err(|err| format_err!("error getting mtime for {:?}: {}", path, err))?;
+
+                if modified < cutoff_time {
+                    match std::fs::remove_file(path) {
+                        Ok(()) => {},
+                        Err(err) if err.kind() == std::io::ErrorKind::NotFound => {},
+                        Err(err) => bail!("could not remove file: {}", err),
+                    }
+                }
+            }
+        }
+    }
+
+    Ok(())
+}
+
 
 /// Path to the worker log file
 pub fn upid_log_path(upid: &UPID) -> Result<std::path::PathBuf, Error> {
@@ -262,7 +322,7 @@ pub fn upid_read_status(upid: &UPID) -> Result<TaskState, Error> {
 
     let mut iter = last_line.splitn(2, ": ");
     if let Some(time_str) = iter.next() {
-        if let Ok(endtime) = proxmox::tools::time::parse_rfc3339(time_str) {
+        if let Ok(endtime) = proxmox_time::parse_rfc3339(time_str) {
             // set the endtime even if we cannot parse the state
             status = TaskState::Unknown { endtime };
             if let Some(rest) = iter.next().and_then(|rest| rest.strip_prefix("TASK ")) {
@@ -324,8 +384,16 @@ pub fn worker_is_active_local(upid: &UPID) -> bool {
     }
 }
 
+/// Register task control command on a [CommandSocket].
+///
+/// This create two commands:
+///
+/// * ``worker-task-abort <UPID>``: calls [abort_local_worker]
+///
+/// * ``worker-task-status <UPID>``: return true of false, depending on
+/// whether the worker is running or stopped.
 pub fn register_task_control_commands(
-    commando_sock: &mut CommandoSocket,
+    commando_sock: &mut CommandSocket,
 ) -> Result<(), Error> {
     fn get_upid(args: Option<&Value>) -> Result<UPID, Error> {
         let args = if let Some(args) = args { args } else { bail!("missing args") };
@@ -358,14 +426,20 @@ pub fn register_task_control_commands(
     Ok(())
 }
 
-pub fn abort_worker_async(upid: UPID) {
+/// Try to abort a worker task, but do no wait
+///
+/// Errors (if any) are simply logged.
+pub fn abort_worker_nowait(upid: UPID) {
     tokio::spawn(async move {
         if let Err(err) = abort_worker(upid).await {
-            eprintln!("abort worker failed - {}", err);
+            log::error!("abort worker task failed - {}", err);
         }
     });
 }
 
+/// Abort a worker task
+///
+/// By sending ``worker-task-abort`` to the control socket.
 pub async fn abort_worker(upid: UPID) -> Result<(), Error> {
 
     let sock = crate::ctrl_sock_from_pid(upid.pid);
@@ -513,7 +587,7 @@ fn read_task_file<R: Read>(reader: R) -> Result<Vec<TaskListInfo>, Error>
                 state
             }),
             Err(err) => {
-                eprintln!("unable to parse worker status '{}' - {}", line, err);
+                log::warn!("unable to parse worker status '{}' - {}", line, err);
                 continue;
             }
         };
@@ -536,6 +610,7 @@ where
     read_task_file(file)
 }
 
+/// Iterate over existing/active worker tasks
 pub struct TaskListInfoIterator {
     list: VecDeque<TaskListInfo>,
     end: bool,
@@ -544,6 +619,7 @@ pub struct TaskListInfoIterator {
 }
 
 impl TaskListInfoIterator {
+    /// Creates a new iterator instance.
     pub fn new(active_only: bool) -> Result<Self, Error> {
 
         let setup = worker_task_setup()?;
@@ -765,7 +841,7 @@ impl WorkerTask {
     pub fn create_state(&self, result: &Result<(), Error>) -> TaskState {
         let warn_count = self.data.lock().unwrap().warn_count;
 
-        let endtime = proxmox::tools::time::epoch_i64();
+        let endtime = proxmox_time::epoch_i64();
 
         if let Err(err) = result {
             TaskState::Error { message: err.to_string(), endtime }
@@ -811,8 +887,6 @@ impl WorkerTask {
 
     /// Request abort
     pub fn request_abort(&self) {
-        eprintln!("set abort flag for worker {}", self.upid);
-
         let prev_abort = self.abort_requested.swap(true, Ordering::SeqCst);
         if !prev_abort { // log abort one time
             self.log_message(format!("received abort request ..."));
@@ -829,19 +903,6 @@ impl WorkerTask {
         }
     }
 
-    /// Test if abort was requested.
-    pub fn abort_requested(&self) -> bool {
-        self.abort_requested.load(Ordering::SeqCst)
-    }
-
-    /// Fail if abort was requested.
-    pub fn check_abort(&self) -> Result<(), Error> {
-        if self.abort_requested() {
-            bail!("abort requested - aborting task");
-        }
-        Ok(())
-    }
-
     /// Get a future which resolves on task abort
     pub fn abort_future(&self) ->  oneshot::Receiver<()> {
         let (tx, rx) = oneshot::channel::<()>();
@@ -861,8 +922,17 @@ impl WorkerTask {
 }
 
 impl WorkerTaskContext for WorkerTask {
-    fn check_abort(&self) -> Result<(), Error> {
-        self.check_abort()
+
+    fn abort_requested(&self) -> bool {
+        self.abort_requested.load(Ordering::SeqCst)
+    }
+
+    fn shutdown_requested(&self) -> bool {
+        crate::shutdown_requested()
+    }
+
+    fn fail_on_shutdown(&self) -> Result<(), Error> {
+        crate::fail_on_shutdown()
     }
 
     fn log(&self, level: log::Level, message: &std::fmt::Arguments) {