]> git.proxmox.com Git - proxmox-backup.git/commitdiff
move worker_task.rs into proxmox-rest-server crate
authorDietmar Maurer <dietmar@proxmox.com>
Thu, 23 Sep 2021 08:09:19 +0000 (10:09 +0200)
committerDietmar Maurer <dietmar@proxmox.com>
Fri, 24 Sep 2021 08:28:17 +0000 (10:28 +0200)
Also moved pbs-datastore/src/task.rs to pbs-tools, which now depends on 'log'.

48 files changed:
pbs-datastore/src/chunk_store.rs
pbs-datastore/src/lib.rs
pbs-datastore/src/task.rs [deleted file]
pbs-tools/Cargo.toml
pbs-tools/src/lib.rs
pbs-tools/src/task.rs [new file with mode: 0644]
proxmox-rest-server/Cargo.toml
proxmox-rest-server/src/lib.rs
proxmox-rest-server/src/worker_task.rs [new file with mode: 0644]
src/acme/plugin.rs
src/api2/admin/datastore.rs
src/api2/backup/environment.rs
src/api2/backup/mod.rs
src/api2/config/acme.rs
src/api2/config/datastore.rs
src/api2/node/apt.rs
src/api2/node/certificates.rs
src/api2/node/disks/directory.rs
src/api2/node/disks/mod.rs
src/api2/node/disks/zfs.rs
src/api2/node/mod.rs
src/api2/node/network.rs
src/api2/node/services.rs
src/api2/node/tasks.rs
src/api2/pull.rs
src/api2/reader/environment.rs
src/api2/reader/mod.rs
src/api2/tape/backup.rs
src/api2/tape/drive.rs
src/api2/tape/restore.rs
src/backup/datastore.rs
src/backup/verify.rs
src/bin/proxmox-backup-api.rs
src/bin/proxmox-backup-manager.rs
src/bin/proxmox-backup-proxy.rs
src/bin/proxmox-daily-update.rs
src/bin/proxmox_backup_debug/api.rs
src/server/gc_job.rs
src/server/h2service.rs
src/server/jobstate.rs
src/server/mod.rs
src/server/prune_job.rs
src/server/pull.rs
src/server/verify_job.rs
src/server/worker_task.rs [deleted file]
src/tape/drive/mod.rs
src/tape/pool_writer/mod.rs
tests/worker-task-abort.rs

index 361cc9a2fa54462c727480e41e6f6270299a8511..5c50e4fced0d00504d963700103dc9f30e6030cf 100644 (file)
@@ -9,10 +9,9 @@ use proxmox::tools::fs::{CreateOptions, create_path, create_dir};
 
 use pbs_api_types::GarbageCollectionStatus;
 use pbs_tools::process_locker::{self, ProcessLocker};
+use pbs_tools::{task_log, task::TaskState};
 
 use crate::DataBlob;
-use crate::task_log;
-use crate::task::TaskState;
 
 /// File system based chunk store
 pub struct ChunkStore {
@@ -306,7 +305,7 @@ impl ChunkStore {
         for (entry, percentage, bad) in self.get_chunk_iterator()? {
             if last_percentage != percentage {
                 last_percentage = percentage;
-                crate::task_log!(
+                task_log!(
                     worker,
                     "processed {}% ({} chunks)",
                     percentage,
index cfe399218427d46cddd9ac8637a80f29a05bae77..5a09666bc32e3c23931f75608c99bd926c5646ee 100644 (file)
@@ -179,7 +179,6 @@ pub mod paperkey;
 pub mod prune;
 pub mod read_chunk;
 pub mod store_progress;
-pub mod task;
 
 pub mod dynamic_index;
 pub mod fixed_index;
diff --git a/pbs-datastore/src/task.rs b/pbs-datastore/src/task.rs
deleted file mode 100644 (file)
index 8cfd6fe..0000000
+++ /dev/null
@@ -1,56 +0,0 @@
-use anyhow::Error;
-
-/// `WorkerTask` methods commonly used from contexts otherwise not related to the API server.
-pub trait TaskState {
-    /// If the task should be aborted, this should fail with a reasonable error message.
-    fn check_abort(&self) -> Result<(), Error>;
-
-    /// Create a log message for this task.
-    fn log(&self, level: log::Level, message: &std::fmt::Arguments);
-}
-
-/// Convenience implementation:
-impl<T: TaskState + ?Sized> TaskState for std::sync::Arc<T> {
-    fn check_abort(&self) -> Result<(), Error> {
-        <T as TaskState>::check_abort(&*self)
-    }
-
-    fn log(&self, level: log::Level, message: &std::fmt::Arguments) {
-        <T as TaskState>::log(&*self, level, message)
-    }
-}
-
-#[macro_export]
-macro_rules! task_error {
-    ($task:expr, $($fmt:tt)+) => {{
-        $crate::task::TaskState::log(&*$task, log::Level::Error, &format_args!($($fmt)+))
-    }};
-}
-
-#[macro_export]
-macro_rules! task_warn {
-    ($task:expr, $($fmt:tt)+) => {{
-        $crate::task::TaskState::log(&*$task, log::Level::Warn, &format_args!($($fmt)+))
-    }};
-}
-
-#[macro_export]
-macro_rules! task_log {
-    ($task:expr, $($fmt:tt)+) => {{
-        $crate::task::TaskState::log(&*$task, log::Level::Info, &format_args!($($fmt)+))
-    }};
-}
-
-#[macro_export]
-macro_rules! task_debug {
-    ($task:expr, $($fmt:tt)+) => {{
-        $crate::task::TaskState::log(&*$task, log::Level::Debug, &format_args!($($fmt)+))
-    }};
-}
-
-#[macro_export]
-macro_rules! task_trace {
-    ($task:expr, $($fmt:tt)+) => {{
-        $crate::task::TaskState::log(&*$task, log::Level::Trace, &format_args!($($fmt)+))
-    }};
-}
index f20a315e6134478e372a26f44491f00976ff7c51..d37ef8651dd717225d5f4623dab72b60b363c1a8 100644 (file)
@@ -17,6 +17,7 @@ foreign-types = "0.3"
 futures = "0.3"
 lazy_static = "1.4"
 libc = "0.2"
+log = "0.4"
 nix = "0.19.1"
 nom = "5.1"
 openssl = "0.10"
index 000591c3e934f1094641456e0aca86db645ca7ee..6c2f0ff5c1f704edd5476a6f04a035cacb880407 100644 (file)
@@ -24,6 +24,7 @@ pub mod str;
 pub mod stream;
 pub mod sync;
 pub mod sys;
+pub mod task;
 pub mod ticket;
 pub mod tokio;
 pub mod xattr;
diff --git a/pbs-tools/src/task.rs b/pbs-tools/src/task.rs
new file mode 100644 (file)
index 0000000..8cfd6fe
--- /dev/null
@@ -0,0 +1,56 @@
+use anyhow::Error;
+
+/// `WorkerTask` methods commonly used from contexts otherwise not related to the API server.
+pub trait TaskState {
+    /// If the task should be aborted, this should fail with a reasonable error message.
+    fn check_abort(&self) -> Result<(), Error>;
+
+    /// Create a log message for this task.
+    fn log(&self, level: log::Level, message: &std::fmt::Arguments);
+}
+
+/// Convenience implementation:
+impl<T: TaskState + ?Sized> TaskState for std::sync::Arc<T> {
+    fn check_abort(&self) -> Result<(), Error> {
+        <T as TaskState>::check_abort(&*self)
+    }
+
+    fn log(&self, level: log::Level, message: &std::fmt::Arguments) {
+        <T as TaskState>::log(&*self, level, message)
+    }
+}
+
+#[macro_export]
+macro_rules! task_error {
+    ($task:expr, $($fmt:tt)+) => {{
+        $crate::task::TaskState::log(&*$task, log::Level::Error, &format_args!($($fmt)+))
+    }};
+}
+
+#[macro_export]
+macro_rules! task_warn {
+    ($task:expr, $($fmt:tt)+) => {{
+        $crate::task::TaskState::log(&*$task, log::Level::Warn, &format_args!($($fmt)+))
+    }};
+}
+
+#[macro_export]
+macro_rules! task_log {
+    ($task:expr, $($fmt:tt)+) => {{
+        $crate::task::TaskState::log(&*$task, log::Level::Info, &format_args!($($fmt)+))
+    }};
+}
+
+#[macro_export]
+macro_rules! task_debug {
+    ($task:expr, $($fmt:tt)+) => {{
+        $crate::task::TaskState::log(&*$task, log::Level::Debug, &format_args!($($fmt)+))
+    }};
+}
+
+#[macro_export]
+macro_rules! task_trace {
+    ($task:expr, $($fmt:tt)+) => {{
+        $crate::task::TaskState::log(&*$task, log::Level::Trace, &format_args!($($fmt)+))
+    }};
+}
index b02c20dbed83d78683b70c1f7d80e2dc1b726298..afaf40e1beed5b718d85a12bb93c0037e361521f 100644 (file)
@@ -15,6 +15,7 @@ lazy_static = "1.4"
 libc = "0.2"
 log = "0.4"
 nix = "0.19.1"
+once_cell = "1.3.1"
 percent-encoding = "2.1"
 regex = "1.2"
 serde = { version = "1.0", features = [] }
index 2f29f9cd72135b4662e919bc5be8740fb243d017..9acdb3fdbc6df2a4e781da5c7d4c1449bf7b138d 100644 (file)
@@ -1,9 +1,12 @@
 use std::os::unix::io::RawFd;
 
 use anyhow::{bail, format_err, Error};
+use nix::unistd::Pid;
 
 use proxmox::tools::fd::Fd;
+use proxmox::sys::linux::procfs::PidStat;
 use proxmox::api::UserInformation;
+use proxmox::tools::fs::CreateOptions;
 
 mod compression;
 pub use compression::*;
@@ -29,6 +32,9 @@ pub use api_config::ApiConfig;
 mod rest;
 pub use rest::{RestServer, handle_api_request};
 
+mod worker_task;
+pub use worker_task::*;
+
 pub enum AuthError {
     Generic(Error),
     NoData,
@@ -48,6 +54,40 @@ pub trait ApiAuth {
     ) -> Result<(String, Box<dyn UserInformation + Sync + Send>), AuthError>;
 }
 
+lazy_static::lazy_static!{
+    static ref PID: i32 = unsafe { libc::getpid() };
+    static ref PSTART: u64 = PidStat::read_from_pid(Pid::from_raw(*PID)).unwrap().starttime;
+}
+
+pub fn pid() -> i32 {
+    *PID
+}
+
+pub fn pstart() -> u64 {
+    *PSTART
+}
+
+pub fn write_pid(pid_fn: &str) -> Result<(), Error> {
+    let pid_str = format!("{}\n", *PID);
+    proxmox::tools::fs::replace_file(pid_fn, pid_str.as_bytes(), CreateOptions::new())
+}
+
+pub fn read_pid(pid_fn: &str) -> Result<i32, Error> {
+    let pid = proxmox::tools::fs::file_get_contents(pid_fn)?;
+    let pid = std::str::from_utf8(&pid)?.trim();
+    pid.parse().map_err(|err| format_err!("could not parse pid - {}", err))
+}
+
+pub fn ctrl_sock_from_pid(pid: i32) -> String {
+    // Note: The control socket always uses @/run/proxmox-backup/ as prefix
+    // for historc reason.
+    format!("\0{}/control-{}.sock", "/run/proxmox-backup", pid)
+}
+
+pub fn our_ctrl_sock() -> String {
+    ctrl_sock_from_pid(*PID)
+}
+
 static mut SHUTDOWN_REQUESTED: bool = false;
 
 pub fn request_shutdown() {
diff --git a/proxmox-rest-server/src/worker_task.rs b/proxmox-rest-server/src/worker_task.rs
new file mode 100644 (file)
index 0000000..b6ed686
--- /dev/null
@@ -0,0 +1,903 @@
+use std::collections::{HashMap, VecDeque};
+use std::fs::File;
+use std::path::PathBuf;
+use std::io::{Read, Write, BufRead, BufReader};
+use std::panic::UnwindSafe;
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::sync::{Arc, Mutex};
+
+use anyhow::{bail, format_err, Error};
+use futures::*;
+use lazy_static::lazy_static;
+use serde_json::{json, Value};
+use serde::{Serialize, Deserialize};
+use tokio::sync::oneshot;
+use nix::fcntl::OFlag;
+use once_cell::sync::OnceCell;
+
+use proxmox::sys::linux::procfs;
+use proxmox::try_block;
+use proxmox::tools::fs::{create_path, replace_file, atomic_open_or_create_file, CreateOptions};
+use proxmox::api::upid::UPID;
+
+use pbs_tools::logrotate::{LogRotate, LogRotateFiles};
+
+use crate::{CommandoSocket, FileLogger, FileLogOptions};
+
+struct TaskListLockGuard(File);
+
+struct WorkerTaskSetup {
+    file_opts: CreateOptions,
+    taskdir: PathBuf,
+    task_lock_fn: PathBuf,
+    active_tasks_fn: PathBuf,
+    task_index_fn: PathBuf,
+    task_archive_fn: PathBuf,
+}
+
+static WORKER_TASK_SETUP: OnceCell<WorkerTaskSetup> = OnceCell::new();
+
+fn worker_task_setup() -> Result<&'static WorkerTaskSetup, Error> {
+    WORKER_TASK_SETUP.get()
+        .ok_or_else(|| format_err!("WorkerTask library is not initialized"))
+}
+
+impl WorkerTaskSetup {
+
+    fn new(basedir: PathBuf, file_opts: CreateOptions) -> Self {
+
+        let mut taskdir = basedir.clone();
+        taskdir.push("tasks");
+
+        let mut task_lock_fn = taskdir.clone();
+        task_lock_fn.push(".active.lock");
+
+        let mut active_tasks_fn = taskdir.clone();
+        active_tasks_fn.push("active");
+
+        let mut task_index_fn = taskdir.clone();
+        task_index_fn.push("index");
+
+        let mut task_archive_fn = taskdir.clone();
+        task_archive_fn.push("archive");
+
+        Self {
+            file_opts,
+            taskdir,
+            task_lock_fn,
+            active_tasks_fn,
+            task_index_fn,
+            task_archive_fn,
+        }
+    }
+
+    fn lock_task_list_files(&self, exclusive: bool) -> Result<TaskListLockGuard, Error> {
+        let options =  self.file_opts.clone()
+            .perm(nix::sys::stat::Mode::from_bits_truncate(0o660));
+
+        let timeout = std::time::Duration::new(10, 0);
+
+        let file = proxmox::tools::fs::open_file_locked(
+            &self.task_lock_fn,
+            timeout,
+            exclusive,
+            options,
+        )?;
+
+        Ok(TaskListLockGuard(file))
+    }
+
+    fn log_path(&self, upid: &UPID) -> std::path::PathBuf {
+        let mut path = self.taskdir.clone();
+        path.push(format!("{:02X}", upid.pstart % 256));
+        path.push(upid.to_string());
+        path
+    }
+
+    // atomically read/update the task list, update status of finished tasks
+    // new_upid is added to the list when specified.
+    fn update_active_workers(&self, new_upid: Option<&UPID>) -> Result<(), Error> {
+
+        let lock = self.lock_task_list_files(true)?;
+
+        // TODO remove with 1.x
+        let mut finish_list: Vec<TaskListInfo> = read_task_file_from_path(&self.task_index_fn)?;
+        let had_index_file = !finish_list.is_empty();
+
+        // We use filter_map because one negative case wants to *move* the data into `finish_list`,
+        // clippy doesn't quite catch this!
+        #[allow(clippy::unnecessary_filter_map)]
+        let mut active_list: Vec<TaskListInfo> = read_task_file_from_path(&self.active_tasks_fn)?
+            .into_iter()
+            .filter_map(|info| {
+                if info.state.is_some() {
+                    // this can happen when the active file still includes finished tasks
+                    finish_list.push(info);
+                    return None;
+                }
+
+                if !worker_is_active_local(&info.upid) {
+                    // println!("Detected stopped task '{}'", &info.upid_str);
+                    let now = proxmox::tools::time::epoch_i64();
+                    let status = upid_read_status(&info.upid).unwrap_or(TaskState::Unknown { endtime: now });
+                    finish_list.push(TaskListInfo {
+                        upid: info.upid,
+                        upid_str: info.upid_str,
+                        state: Some(status)
+                    });
+                    return None;
+                }
+
+                Some(info)
+            }).collect();
+
+        if let Some(upid) = new_upid {
+            active_list.push(TaskListInfo { upid: upid.clone(), upid_str: upid.to_string(), state: None });
+        }
+
+        let active_raw = render_task_list(&active_list);
+
+        let options =  self.file_opts.clone()
+            .perm(nix::sys::stat::Mode::from_bits_truncate(0o660));
+
+        replace_file(
+            &self.active_tasks_fn,
+            active_raw.as_bytes(),
+            options,
+        )?;
+
+        finish_list.sort_unstable_by(|a, b| {
+            match (&a.state, &b.state) {
+                (Some(s1), Some(s2)) => s1.cmp(&s2),
+                (Some(_), None) => std::cmp::Ordering::Less,
+                (None, Some(_)) => std::cmp::Ordering::Greater,
+                _ => a.upid.starttime.cmp(&b.upid.starttime),
+            }
+        });
+
+        if !finish_list.is_empty() {
+            let options =  self.file_opts.clone()
+                .perm(nix::sys::stat::Mode::from_bits_truncate(0o660));
+
+            let mut writer = atomic_open_or_create_file(
+                &self.task_archive_fn,
+                OFlag::O_APPEND | OFlag::O_RDWR,
+                &[],
+                options,
+            )?;
+            for info in &finish_list {
+                writer.write_all(render_task_line(&info).as_bytes())?;
+            }
+        }
+
+        // TODO Remove with 1.x
+        // for compatibility, if we had an INDEX file, we do not need it anymore
+        if had_index_file {
+            let _ = nix::unistd::unlink(&self.task_index_fn);
+        }
+
+        drop(lock);
+
+        Ok(())
+    }
+
+    // Create task log directory with correct permissions
+    fn create_task_log_dirs(&self) -> Result<(), Error> {
+
+        try_block!({
+            let dir_opts = self.file_opts.clone()
+                .perm(nix::sys::stat::Mode::from_bits_truncate(0o755));
+
+            create_path(&self.taskdir, Some(dir_opts.clone()), Some(dir_opts.clone()))?;
+            // fixme:??? create_path(pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR, None, Some(opts))?;
+            Ok(())
+        }).map_err(|err: Error| format_err!("unable to create task log dir - {}", err))
+    }
+}
+
+/// Initialize the WorkerTask library
+pub fn init_worker_tasks(basedir: PathBuf, file_opts: CreateOptions) -> Result<(), Error> {
+    let setup = WorkerTaskSetup::new(basedir, file_opts);
+    setup.create_task_log_dirs()?;
+    WORKER_TASK_SETUP.set(setup)
+        .map_err(|_| format_err!("init_worker_tasks failed - already initialized"))
+}
+
+/// checks if the Task Archive is bigger that 'size_threshold' bytes, and
+/// rotates it if it is
+pub fn rotate_task_log_archive(size_threshold: u64, compress: bool, max_files: Option<usize>) -> Result<bool, Error> {
+
+    let setup = worker_task_setup()?;
+
+    let _lock = setup.lock_task_list_files(true)?;
+
+    let mut logrotate = LogRotate::new(&setup.task_archive_fn, compress)
+            .ok_or_else(|| format_err!("could not get archive file names"))?;
+
+    logrotate.rotate(size_threshold, None, max_files)
+}
+
+
+/// Path to the worker log file
+pub fn upid_log_path(upid: &UPID) -> Result<std::path::PathBuf, Error> {
+    let setup = worker_task_setup()?;
+    Ok(setup.log_path(upid))
+}
+
+/// Read endtime (time of last log line) and exitstatus from task log file
+/// If there is not a single line with at valid datetime, we assume the
+/// starttime to be the endtime
+pub fn upid_read_status(upid: &UPID) -> Result<TaskState, Error> {
+
+    let setup = worker_task_setup()?;
+
+    let mut status = TaskState::Unknown { endtime: upid.starttime };
+
+    let path = setup.log_path(upid);
+
+    let mut file = File::open(path)?;
+
+    /// speedup - only read tail
+    use std::io::Seek;
+    use std::io::SeekFrom;
+    let _ = file.seek(SeekFrom::End(-8192)); // ignore errors
+
+    let mut data = Vec::with_capacity(8192);
+    file.read_to_end(&mut data)?;
+
+    // strip newlines at the end of the task logs
+    while data.last() == Some(&b'\n') {
+        data.pop();
+    }
+
+    let last_line = match data.iter().rposition(|c| *c == b'\n') {
+        Some(start) if data.len() > (start+1) => &data[start+1..],
+        Some(_) => &data, // should not happen, since we removed all trailing newlines
+        None => &data,
+    };
+
+    let last_line = std::str::from_utf8(last_line)
+        .map_err(|err| format_err!("upid_read_status: utf8 parse failed: {}", err))?;
+
+    let mut iter = last_line.splitn(2, ": ");
+    if let Some(time_str) = iter.next() {
+        if let Ok(endtime) = proxmox::tools::time::parse_rfc3339(time_str) {
+            // set the endtime even if we cannot parse the state
+            status = TaskState::Unknown { endtime };
+            if let Some(rest) = iter.next().and_then(|rest| rest.strip_prefix("TASK ")) {
+                if let Ok(state) = TaskState::from_endtime_and_message(endtime, rest) {
+                    status = state;
+                }
+            }
+        }
+    }
+
+    Ok(status)
+}
+
+lazy_static! {
+    static ref WORKER_TASK_LIST: Mutex<HashMap<usize, Arc<WorkerTask>>> = Mutex::new(HashMap::new());
+}
+
+/// checks if the task UPID refers to a worker from this process
+fn is_local_worker(upid: &UPID) -> bool {
+    upid.pid == crate::pid() && upid.pstart == crate::pstart()
+}
+
+/// Test if the task is still running
+pub async fn worker_is_active(upid: &UPID) -> Result<bool, Error> {
+    if is_local_worker(upid) {
+        return Ok(WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id));
+    }
+
+    if procfs::check_process_running_pstart(upid.pid, upid.pstart).is_none() {
+        return Ok(false);
+    }
+
+    let sock = crate::ctrl_sock_from_pid(upid.pid);
+    let cmd = json!({
+        "command": "worker-task-status",
+        "args": {
+            "upid": upid.to_string(),
+        },
+    });
+    let status = crate::send_command(sock, &cmd).await?;
+
+    if let Some(active) = status.as_bool() {
+        Ok(active)
+    } else {
+        bail!("got unexpected result {:?} (expected bool)", status);
+    }
+}
+
+/// Test if the task is still running (fast but inaccurate implementation)
+///
+/// If the task is spawned from a different process, we simply return if
+/// that process is still running. This information is good enough to detect
+/// stale tasks...
+pub fn worker_is_active_local(upid: &UPID) -> bool {
+    if is_local_worker(upid) {
+        WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id)
+    } else {
+        procfs::check_process_running_pstart(upid.pid, upid.pstart).is_some()
+    }
+}
+
+pub fn register_task_control_commands(
+    commando_sock: &mut CommandoSocket,
+) -> Result<(), Error> {
+    fn get_upid(args: Option<&Value>) -> Result<UPID, Error> {
+        let args = if let Some(args) = args { args } else { bail!("missing args") };
+        let upid = match args.get("upid") {
+            Some(Value::String(upid)) => upid.parse::<UPID>()?,
+            None => bail!("no upid in args"),
+            _ => bail!("unable to parse upid"),
+        };
+        if !is_local_worker(&upid) {
+            bail!("upid does not belong to this process");
+        }
+        Ok(upid)
+    }
+
+    commando_sock.register_command("worker-task-abort".into(), move |args| {
+        let upid = get_upid(args)?;
+
+        abort_local_worker(upid);
+
+        Ok(Value::Null)
+    })?;
+    commando_sock.register_command("worker-task-status".into(), move |args| {
+        let upid = get_upid(args)?;
+
+        let active = WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id);
+
+        Ok(active.into())
+    })?;
+
+    Ok(())
+}
+
+pub fn abort_worker_async(upid: UPID) {
+    tokio::spawn(async move {
+        if let Err(err) = abort_worker(upid).await {
+            eprintln!("abort worker failed - {}", err);
+        }
+    });
+}
+
+pub async fn abort_worker(upid: UPID) -> Result<(), Error> {
+
+    let sock = crate::ctrl_sock_from_pid(upid.pid);
+    let cmd = json!({
+        "command": "worker-task-abort",
+        "args": {
+            "upid": upid.to_string(),
+        },
+    });
+    crate::send_command(sock, &cmd).map_ok(|_| ()).await
+}
+
+fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<TaskState>), Error> {
+
+    let data = line.splitn(3, ' ').collect::<Vec<&str>>();
+
+    let len = data.len();
+
+    match len {
+        1 => Ok((data[0].to_owned(), data[0].parse::<UPID>()?, None)),
+        3 => {
+            let endtime = i64::from_str_radix(data[1], 16)?;
+            let state = TaskState::from_endtime_and_message(endtime, data[2])?;
+            Ok((data[0].to_owned(), data[0].parse::<UPID>()?, Some(state)))
+        }
+        _ => bail!("wrong number of components"),
+    }
+}
+
+/// Task State
+#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
+pub enum TaskState {
+    /// The Task ended with an undefined state
+    Unknown { endtime: i64 },
+    /// The Task ended and there were no errors or warnings
+    OK { endtime: i64 },
+    /// The Task had 'count' amount of warnings and no errors
+    Warning { count: u64, endtime: i64 },
+    /// The Task ended with the error described in 'message'
+    Error { message: String, endtime: i64 },
+}
+
+impl TaskState {
+    pub fn endtime(&self) -> i64 {
+        match *self {
+            TaskState::Unknown { endtime } => endtime,
+            TaskState::OK { endtime } => endtime,
+            TaskState::Warning { endtime, .. } => endtime,
+            TaskState::Error { endtime, .. } => endtime,
+        }
+    }
+
+    fn result_text(&self) -> String {
+        match self {
+            TaskState::Error { message, .. } => format!("TASK ERROR: {}", message),
+            other => format!("TASK {}", other),
+        }
+    }
+
+    fn from_endtime_and_message(endtime: i64, s: &str) -> Result<Self, Error> {
+        if s == "unknown" {
+            Ok(TaskState::Unknown { endtime })
+        } else if s == "OK" {
+            Ok(TaskState::OK { endtime })
+        } else if let Some(warnings) = s.strip_prefix("WARNINGS: ") {
+            let count: u64 = warnings.parse()?;
+            Ok(TaskState::Warning{ count, endtime })
+        } else if !s.is_empty() {
+            let message = if let Some(err) = s.strip_prefix("ERROR: ") { err } else { s }.to_string();
+            Ok(TaskState::Error{ message, endtime })
+        } else {
+            bail!("unable to parse Task Status '{}'", s);
+        }
+    }
+}
+
+impl std::cmp::PartialOrd for TaskState {
+    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
+        Some(self.endtime().cmp(&other.endtime()))
+    }
+}
+
+impl std::cmp::Ord for TaskState {
+    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
+        self.endtime().cmp(&other.endtime())
+    }
+}
+
+impl std::fmt::Display for TaskState {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        match self {
+            TaskState::Unknown { .. } => write!(f, "unknown"),
+            TaskState::OK { .. }=> write!(f, "OK"),
+            TaskState::Warning { count, .. } => write!(f, "WARNINGS: {}", count),
+            TaskState::Error { message, .. } => write!(f, "{}", message),
+        }
+    }
+}
+
+/// Task details including parsed UPID
+///
+/// If there is no `state`, the task is still running.
+#[derive(Debug)]
+pub struct TaskListInfo {
+    /// The parsed UPID
+    pub upid: UPID,
+    /// UPID string representation
+    pub upid_str: String,
+    /// Task `(endtime, status)` if already finished
+    pub state: Option<TaskState>, // endtime, status
+}
+
+fn render_task_line(info: &TaskListInfo) -> String {
+    let mut raw = String::new();
+    if let Some(status) = &info.state {
+        raw.push_str(&format!("{} {:08X} {}\n", info.upid_str, status.endtime(), status));
+    } else {
+        raw.push_str(&info.upid_str);
+        raw.push('\n');
+    }
+
+    raw
+}
+
+fn render_task_list(list: &[TaskListInfo]) -> String {
+    let mut raw = String::new();
+    for info in list {
+        raw.push_str(&render_task_line(&info));
+    }
+    raw
+}
+
+// note this is not locked, caller has to make sure it is
+// this will skip (and log) lines that are not valid status lines
+fn read_task_file<R: Read>(reader: R) -> Result<Vec<TaskListInfo>, Error>
+{
+    let reader = BufReader::new(reader);
+    let mut list = Vec::new();
+    for line in reader.lines() {
+        let line = line?;
+        match parse_worker_status_line(&line) {
+            Ok((upid_str, upid, state)) => list.push(TaskListInfo {
+                upid_str,
+                upid,
+                state
+            }),
+            Err(err) => {
+                eprintln!("unable to parse worker status '{}' - {}", line, err);
+                continue;
+            }
+        };
+    }
+
+    Ok(list)
+}
+
+// note this is not locked, caller has to make sure it is
+fn read_task_file_from_path<P>(path: P) -> Result<Vec<TaskListInfo>, Error>
+where
+    P: AsRef<std::path::Path> + std::fmt::Debug,
+{
+    let file = match File::open(&path) {
+        Ok(f) => f,
+        Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
+        Err(err) => bail!("unable to open task list {:?} - {}", path, err),
+    };
+
+    read_task_file(file)
+}
+
+pub struct TaskListInfoIterator {
+    list: VecDeque<TaskListInfo>,
+    end: bool,
+    archive: Option<LogRotateFiles>,
+    lock: Option<TaskListLockGuard>,
+}
+
+impl TaskListInfoIterator {
+    pub fn new(active_only: bool) -> Result<Self, Error> {
+
+        let setup = worker_task_setup()?;
+
+        let (read_lock, active_list) = {
+            let lock = setup.lock_task_list_files(false)?;
+            let active_list = read_task_file_from_path(&setup.active_tasks_fn)?;
+
+            let needs_update = active_list
+                .iter()
+                .any(|info| info.state.is_some() || !worker_is_active_local(&info.upid));
+
+            // TODO remove with 1.x
+            let index_exists = setup.task_index_fn.is_file();
+
+            if needs_update || index_exists {
+                drop(lock);
+                setup.update_active_workers(None)?;
+                let lock = setup.lock_task_list_files(false)?;
+                let active_list = read_task_file_from_path(&setup.active_tasks_fn)?;
+                (lock, active_list)
+            } else {
+                (lock, active_list)
+            }
+        };
+
+        let archive = if active_only {
+            None
+        } else {
+            let logrotate = LogRotate::new(&setup.task_archive_fn, true)
+                .ok_or_else(|| format_err!("could not get archive file names"))?;
+            Some(logrotate.files())
+        };
+
+        let lock = if active_only { None } else { Some(read_lock) };
+
+        Ok(Self {
+            list: active_list.into(),
+            end: active_only,
+            archive,
+            lock,
+        })
+    }
+}
+
+impl Iterator for TaskListInfoIterator {
+    type Item = Result<TaskListInfo, Error>;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        loop {
+            if let Some(element) = self.list.pop_back() {
+                return Some(Ok(element));
+            } else if self.end {
+                    return None;
+            } else {
+                if let Some(mut archive) = self.archive.take() {
+                    if let Some(file) = archive.next() {
+                        let list = match read_task_file(file) {
+                            Ok(list) => list,
+                            Err(err) => return Some(Err(err)),
+                        };
+                        self.list.append(&mut list.into());
+                        self.archive = Some(archive);
+                        continue;
+                    }
+                }
+
+                self.end = true;
+                self.lock.take();
+            }
+        }
+    }
+}
+
+/// Launch long running worker tasks.
+///
+/// A worker task can either be a whole thread, or a simply tokio
+/// task/future. Each task can `log()` messages, which are stored
+/// persistently to files. Task should poll the `abort_requested`
+/// flag, and stop execution when requested.
+pub struct WorkerTask {
+    setup: &'static WorkerTaskSetup,
+    upid: UPID,
+    data: Mutex<WorkerTaskData>,
+    abort_requested: AtomicBool,
+}
+
+impl std::fmt::Display for WorkerTask {
+
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        self.upid.fmt(f)
+    }
+}
+
+struct WorkerTaskData {
+    logger: FileLogger,
+    progress: f64, // 0..1
+    warn_count: u64,
+    pub abort_listeners: Vec<oneshot::Sender<()>>,
+}
+
+impl WorkerTask {
+
+    pub fn new(
+        worker_type: &str,
+        worker_id: Option<String>,
+        auth_id: String,
+        to_stdout: bool,
+    ) -> Result<Arc<Self>, Error> {
+
+        let setup = worker_task_setup()?;
+
+        let upid = UPID::new(worker_type, worker_id, auth_id)?;
+        let task_id = upid.task_id;
+
+        let mut path = setup.taskdir.clone();
+
+        path.push(format!("{:02X}", upid.pstart & 255));
+
+        let dir_opts = setup.file_opts.clone()
+            .perm(nix::sys::stat::Mode::from_bits_truncate(0o755));
+
+        create_path(&path, None, Some(dir_opts))?;
+
+        path.push(upid.to_string());
+
+        let logger_options = FileLogOptions {
+            to_stdout,
+            exclusive: true,
+            prefix_time: true,
+            read: true,
+            file_opts: setup.file_opts.clone(),
+            ..Default::default()
+        };
+        let logger = FileLogger::new(&path, logger_options)?;
+
+        let worker = Arc::new(Self {
+            setup,
+            upid: upid.clone(),
+            abort_requested: AtomicBool::new(false),
+            data: Mutex::new(WorkerTaskData {
+                logger,
+                progress: 0.0,
+                warn_count: 0,
+                abort_listeners: vec![],
+            }),
+        });
+
+        // scope to drop the lock again after inserting
+        {
+            let mut hash = WORKER_TASK_LIST.lock().unwrap();
+            hash.insert(task_id, worker.clone());
+            crate::set_worker_count(hash.len());
+        }
+
+        setup.update_active_workers(Some(&upid))?;
+
+        Ok(worker)
+    }
+
+    /// Spawn a new tokio task/future.
+    pub fn spawn<F, T>(
+        worker_type: &str,
+        worker_id: Option<String>,
+        auth_id: String,
+        to_stdout: bool,
+        f: F,
+    ) -> Result<String, Error>
+        where F: Send + 'static + FnOnce(Arc<WorkerTask>) -> T,
+              T: Send + 'static + Future<Output = Result<(), Error>>,
+    {
+        let worker = WorkerTask::new(worker_type, worker_id, auth_id, to_stdout)?;
+        let upid_str = worker.upid.to_string();
+        let f = f(worker.clone());
+        tokio::spawn(async move {
+            let result = f.await;
+            worker.log_result(&result);
+        });
+
+        Ok(upid_str)
+    }
+
+    /// Create a new worker thread.
+    pub fn new_thread<F>(
+        worker_type: &str,
+        worker_id: Option<String>,
+        auth_id: String,
+        to_stdout: bool,
+        f: F,
+    ) -> Result<String, Error>
+        where F: Send + UnwindSafe + 'static + FnOnce(Arc<WorkerTask>) -> Result<(), Error>
+    {
+        let worker = WorkerTask::new(worker_type, worker_id, auth_id, to_stdout)?;
+        let upid_str = worker.upid.to_string();
+
+        let _child = std::thread::Builder::new().name(upid_str.clone()).spawn(move || {
+            let worker1 = worker.clone();
+            let result = match std::panic::catch_unwind(move || f(worker1)) {
+                Ok(r) => r,
+                Err(panic) => {
+                    match panic.downcast::<&str>() {
+                        Ok(panic_msg) => {
+                            Err(format_err!("worker panicked: {}", panic_msg))
+                        }
+                        Err(_) => {
+                            Err(format_err!("worker panicked: unknown type."))
+                        }
+                    }
+                }
+            };
+
+            worker.log_result(&result);
+        });
+
+        Ok(upid_str)
+    }
+
+    /// create state from self and a result
+    pub fn create_state(&self, result: &Result<(), Error>) -> TaskState {
+        let warn_count = self.data.lock().unwrap().warn_count;
+
+        let endtime = proxmox::tools::time::epoch_i64();
+
+        if let Err(err) = result {
+            TaskState::Error { message: err.to_string(), endtime }
+        } else if warn_count > 0 {
+            TaskState::Warning { count: warn_count, endtime }
+        } else {
+            TaskState::OK { endtime }
+        }
+    }
+
+    /// Log task result, remove task from running list
+    pub fn log_result(&self, result: &Result<(), Error>) {
+        let state = self.create_state(result);
+        self.log(state.result_text());
+
+        WORKER_TASK_LIST.lock().unwrap().remove(&self.upid.task_id);
+        let _ = self.setup.update_active_workers(None);
+        crate::set_worker_count(WORKER_TASK_LIST.lock().unwrap().len());
+    }
+
+    /// Log a message.
+    pub fn log<S: AsRef<str>>(&self, msg: S) {
+        let mut data = self.data.lock().unwrap();
+        data.logger.log(msg);
+    }
+
+    /// Log a message as warning.
+    pub fn warn<S: AsRef<str>>(&self, msg: S) {
+        let mut data = self.data.lock().unwrap();
+        data.logger.log(format!("WARN: {}", msg.as_ref()));
+        data.warn_count += 1;
+    }
+
+    /// Set progress indicator
+    pub fn progress(&self, progress: f64) {
+        if progress >= 0.0 && progress <= 1.0 {
+            let mut data = self.data.lock().unwrap();
+            data.progress = progress;
+        } else {
+           // fixme:  log!("task '{}': ignoring strange value for progress '{}'", self.upid, progress);
+        }
+    }
+
+    /// Request abort
+    pub fn request_abort(&self) {
+        eprintln!("set abort flag for worker {}", self.upid);
+
+        let prev_abort = self.abort_requested.swap(true, Ordering::SeqCst);
+        if !prev_abort { // log abort one time
+            self.log(format!("received abort request ..."));
+        }
+        // noitify listeners
+        let mut data = self.data.lock().unwrap();
+        loop {
+            match data.abort_listeners.pop() {
+                None => { break; },
+                Some(ch) => {
+                    let _ = ch.send(()); // ignore errors here
+                },
+            }
+        }
+    }
+
+    /// Test if abort was requested.
+    pub fn abort_requested(&self) -> bool {
+        self.abort_requested.load(Ordering::SeqCst)
+    }
+
+    /// Fail if abort was requested.
+    pub fn fail_on_abort(&self) -> Result<(), Error> {
+        if self.abort_requested() {
+            bail!("abort requested - aborting task");
+        }
+        Ok(())
+    }
+
+    /// Get a future which resolves on task abort
+    pub fn abort_future(&self) ->  oneshot::Receiver<()> {
+        let (tx, rx) = oneshot::channel::<()>();
+
+        let mut data = self.data.lock().unwrap();
+        if self.abort_requested() {
+            let _ = tx.send(());
+        } else {
+            data.abort_listeners.push(tx);
+        }
+        rx
+    }
+
+    pub fn upid(&self) -> &UPID {
+        &self.upid
+    }
+}
+
+impl pbs_tools::task::TaskState for WorkerTask {
+    fn check_abort(&self) -> Result<(), Error> {
+        self.fail_on_abort()
+    }
+
+    fn log(&self, level: log::Level, message: &std::fmt::Arguments) {
+        match level {
+            log::Level::Error => self.warn(&message.to_string()),
+            log::Level::Warn => self.warn(&message.to_string()),
+            log::Level::Info => self.log(&message.to_string()),
+            log::Level::Debug => self.log(&format!("DEBUG: {}", message)),
+            log::Level::Trace => self.log(&format!("TRACE: {}", message)),
+        }
+    }
+}
+
+/// Wait for a locally spanned worker task
+///
+/// Note: local workers should print logs to stdout, so there is no
+/// need to fetch/display logs. We just wait for the worker to finish.
+pub async fn wait_for_local_worker(upid_str: &str) -> Result<(), Error> {
+
+    let upid: UPID = upid_str.parse()?;
+
+    let sleep_duration = core::time::Duration::new(0, 100_000_000);
+
+    loop {
+        if worker_is_active_local(&upid) {
+            tokio::time::sleep(sleep_duration).await;
+        } else {
+            break;
+        }
+    }
+    Ok(())
+}
+
+/// Request abort of a local worker (if existing and running)
+pub fn abort_local_worker(upid: UPID) {
+    if let Some(ref worker) = WORKER_TASK_LIST.lock().unwrap().get(&upid.task_id) {
+        worker.request_abort();
+    }
+}
index 7593aaa4392880d0f0b398eb216be385970ef9f8..cb7de08218cbbee754261d8d0df5ddabac077fcd 100644 (file)
@@ -13,7 +13,7 @@ use proxmox_acme_rs::{Authorization, Challenge};
 
 use crate::acme::AcmeClient;
 use crate::api2::types::AcmeDomain;
-use crate::server::WorkerTask;
+use proxmox_rest_server::WorkerTask;
 
 use crate::config::acme::plugin::{DnsPlugin, PluginData};
 
index fbb93f3558319b9c20a9d6e44fb9f25b42dd835c..154a2e844bd2febb64ad881c4378a54708e2a20b 100644 (file)
@@ -54,7 +54,7 @@ use pbs_tools::blocking::WrappedReaderStream;
 use pbs_tools::stream::{AsyncReaderStream, AsyncChannelWriter};
 use pbs_tools::json::{required_integer_param, required_string_param};
 use pbs_config::CachedUserInfo;
-use proxmox_rest_server::formatter;
+use proxmox_rest_server::{WorkerTask, formatter};
 
 use crate::api2::node::rrd::create_value_from_rrd;
 use crate::backup::{
@@ -62,7 +62,7 @@ use crate::backup::{
     DataStore, LocalChunkReader,
 };
 
-use crate::server::{jobstate::Job, WorkerTask};
+use crate::server::jobstate::Job;
 
 
 const GROUP_NOTES_FILE_NAME: &str = "notes";
index 306f91eed6c4395def8876d34054ed89b759021c..8112737eca45af0d68d3e24e699e3ba4dc2a2f3b 100644 (file)
@@ -15,10 +15,10 @@ use pbs_datastore::backup_info::{BackupDir, BackupInfo};
 use pbs_datastore::dynamic_index::DynamicIndexWriter;
 use pbs_datastore::fixed_index::FixedIndexWriter;
 use pbs_api_types::Authid;
-use proxmox_rest_server::formatter::*;
+use proxmox_rest_server::{WorkerTask, formatter::*};
 
 use crate::backup::{verify_backup_dir_with_lock, DataStore};
-use crate::server::WorkerTask;
+
 use hyper::{Body, Response};
 
 #[derive(Copy, Clone, Serialize)]
index c14f19a400f16a322766a1e20d9c2d6711a0f5aa..4333be1717f226158d06e397ab6358d1f40d96fd 100644 (file)
@@ -23,8 +23,9 @@ use pbs_datastore::PROXMOX_BACKUP_PROTOCOL_ID_V1;
 use pbs_datastore::backup_info::{BackupDir, BackupGroup, BackupInfo};
 use pbs_datastore::index::IndexFile;
 use pbs_datastore::manifest::{archive_type, ArchiveType};
+use proxmox_rest_server::WorkerTask;
 
-use crate::server::{WorkerTask, H2Service};
+use crate::server::H2Service;
 use crate::backup::DataStore;
 use pbs_config::CachedUserInfo;
 
index 564cafae78fb2f3bc4ad60a2add022a57fd93ca6..8593acac7c093cc5164eba488d62a5b9ed453ecd 100644 (file)
@@ -24,7 +24,7 @@ use crate::api2::types::{AcmeAccountName, AcmeChallengeSchema, KnownAcmeDirector
 use crate::config::acme::plugin::{
     self, DnsPlugin, DnsPluginCore, DnsPluginCoreUpdater, PLUGIN_ID_SCHEMA,
 };
-use crate::server::WorkerTask;
+use proxmox_rest_server::WorkerTask;
 
 pub(crate) const ROUTER: Router = Router::new()
     .get(&list_subdirs_api_method!(SUBDIRS))
index 0e6529f88dd947f16cbbbadbe42f43dbee5ed399..0f9234caf78fe1924b0a8515af56cb25d8d6c382 100644 (file)
@@ -9,7 +9,6 @@ use proxmox::api::section_config::SectionConfigData;
 use proxmox::api::schema::{ApiType, parse_property_string};
 
 use pbs_datastore::chunk_store::ChunkStore;
-use pbs_datastore::task::TaskState;
 use pbs_config::BackupLockGuard;
 use pbs_api_types::{
     Authid, DatastoreNotify,
@@ -17,6 +16,7 @@ use pbs_api_types::{
     PRIV_DATASTORE_ALLOCATE, PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_MODIFY,
     DataStoreConfig, DataStoreConfigUpdater,
 };
+use pbs_tools::task::TaskState;
 
 use crate::api2::config::sync::delete_sync_job;
 use crate::api2::config::verify::delete_verification_job;
@@ -26,8 +26,9 @@ use crate::api2::admin::{
     verify::list_verification_jobs,
 };
 use pbs_config::CachedUserInfo;
+use proxmox_rest_server::WorkerTask;
 
-use crate::server::{jobstate, WorkerTask};
+use crate::server::jobstate;
 
 #[api(
     input: {
index f02920c039002cdffd1834940420c6c24871e859..4fd815924b3c977386ab13407d346b4f8d0e5395 100644 (file)
@@ -19,7 +19,7 @@ use pbs_api_types::{
 };
 
 use crate::config::node;
-use crate::server::WorkerTask;
+use proxmox_rest_server::WorkerTask;
 use crate::tools::{
     apt,
     pbs_simple_http,
index 7b31861e04ed783a198d99d9be41f07cbdbf3ef4..80733fe90fc19cc893400907fdb8383b52337448 100644 (file)
@@ -18,7 +18,7 @@ use pbs_tools::cert;
 use crate::acme::AcmeClient;
 use crate::api2::types::AcmeDomain;
 use crate::config::node::NodeConfig;
-use crate::server::WorkerTask;
+use proxmox_rest_server::WorkerTask;
 
 pub const ROUTER: Router = Router::new()
     .get(&list_subdirs_api_method!(SUBDIRS))
index 49127586e7dd94c36a7b40859070f8e144d74f56..91369643e6525e3fc7dd0a1815cf3096b71adce2 100644 (file)
@@ -17,7 +17,7 @@ use crate::tools::disks::{
 };
 use crate::tools::systemd::{self, types::*};
 
-use crate::server::WorkerTask;
+use proxmox_rest_server::WorkerTask;
 
 const BASE_MOUNT_DIR: &str = "/mnt/datastore/";
 
index b4001a540b3dcb3d035ce815c16750fe0d86bb81..f08c340b9399dd37395d712a01b58eb9508a13fc 100644 (file)
@@ -15,7 +15,7 @@ use crate::tools::disks::{
     DiskUsageInfo, DiskUsageType, DiskManage, SmartData,
     get_disks, get_smart_data, get_disk_usage_info, inititialize_gpt_disk,
 };
-use crate::server::WorkerTask;
+use proxmox_rest_server::WorkerTask;
 
 pub mod directory;
 pub mod zfs;
index 8a6cb708ed5529ba7415b9c4b68486e8cdd8f8c5..efea9b70eadca1f717dd79bc1fe1eaf72549a765 100644 (file)
@@ -19,7 +19,7 @@ use crate::tools::disks::{
     DiskUsageType,
 };
 
-use crate::server::WorkerTask;
+use proxmox_rest_server::WorkerTask;
 
 
 #[api(
index 8e357311f18b9a721bd60e84efe9ec09cab58c6c..7a5bb64e66021374f1c58af03444adac3b4f6f77 100644 (file)
@@ -24,7 +24,7 @@ use pbs_api_types::{Authid, NODE_SCHEMA, PRIV_SYS_CONSOLE};
 use pbs_tools::auth::private_auth_key;
 use pbs_tools::ticket::{self, Empty, Ticket};
 
-use crate::server::WorkerTask;
+use proxmox_rest_server::WorkerTask;
 use crate::tools;
 
 pub mod apt;
index 0fde9f2ac5ac405cb7afdd8c8088cdc36377fe7e..d496b5f8ea9dc003785ec9acde097d990df586c9 100644 (file)
@@ -13,7 +13,7 @@ use pbs_api_types::{
 };
 use pbs_config::network::{self, NetworkConfig};
 
-use crate::server::{WorkerTask};
+use proxmox_rest_server::WorkerTask;
 
 fn split_interface_list(list: &str) -> Result<Vec<String>, Error> {
     let value = parse_property_string(&list, &NETWORK_INTERFACE_ARRAY_SCHEMA)?;
index 6c757f43a492e49c5c005f0a86e61b50c5b48bd8..8df0fb24b9d59b075aa375b015b5eeac1921050e 100644 (file)
@@ -9,7 +9,7 @@ use proxmox::api::router::SubdirMap;
 
 use pbs_api_types::{Authid, NODE_SCHEMA, SERVICE_ID_SCHEMA, PRIV_SYS_AUDIT, PRIV_SYS_MODIFY};
 
-use crate::server::WorkerTask;
+use proxmox_rest_server::WorkerTask;
 
 static SERVICE_NAME_LIST: [&str; 7] = [
     "proxmox-backup",
index df4673a1453a697e28d3a98e90a320a2e34109f9..0d2b456c05e2e416ed8d77a716ed596bbe7e7e64 100644 (file)
@@ -16,7 +16,8 @@ use pbs_api_types::{
 };
 
 use crate::api2::pull::check_pull_privs;
-use crate::server::{self, upid_log_path, upid_read_status, TaskState, TaskListInfoIterator};
+
+use proxmox_rest_server::{upid_log_path, upid_read_status, TaskState, TaskListInfoIterator};
 use pbs_config::CachedUserInfo;
 
 // matches respective job execution privileges
@@ -125,6 +126,25 @@ pub fn tasktype(state: &TaskState) -> TaskStateType {
     }
 }
 
+fn into_task_list_item(info: proxmox_rest_server::TaskListInfo) -> pbs_api_types::TaskListItem {
+    let (endtime, status) = info
+        .state
+        .map_or_else(|| (None, None), |a| (Some(a.endtime()), Some(a.to_string())));
+
+    pbs_api_types::TaskListItem {
+        upid: info.upid_str,
+        node: "localhost".to_string(),
+        pid: info.upid.pid as i64,
+        pstart: info.upid.pstart,
+        starttime: info.upid.starttime,
+        worker_type: info.upid.worker_type,
+        worker_id: info.upid.worker_id,
+        user: info.upid.auth_id,
+        endtime,
+        status,
+    }
+}
+
 #[api(
     input: {
         properties: {
@@ -217,7 +237,7 @@ async fn get_task_status(
         result["tokenid"] = Value::from(task_auth_id.tokenname().unwrap().as_str());
     }
 
-    if crate::server::worker_is_active(&upid).await? {
+    if proxmox_rest_server::worker_is_active(&upid).await? {
         result["status"] = Value::from("running");
     } else {
         let exitstatus = upid_read_status(&upid).unwrap_or(TaskState::Unknown { endtime: 0 });
@@ -314,7 +334,7 @@ async fn read_task_log(
     rpcenv["total"] = Value::from(count);
 
     if test_status {
-        let active = crate::server::worker_is_active(&upid).await?;
+        let active = proxmox_rest_server::worker_is_active(&upid).await?;
         rpcenv["active"] = Value::from(active);
     }
 
@@ -354,7 +374,7 @@ fn stop_task(
         user_info.check_privs(&auth_id, &["system", "tasks"], PRIV_SYS_MODIFY, false)?;
     }
 
-    server::abort_worker_async(upid);
+    proxmox_rest_server::abort_worker_async(upid);
 
     Ok(Value::Null)
 }
@@ -502,7 +522,7 @@ pub fn list_tasks(
 
         match (&info.state, &statusfilter) {
             (Some(_), _) if running => continue,
-            (Some(crate::server::TaskState::OK { .. }), _) if errors => continue,
+            (Some(TaskState::OK { .. }), _) if errors => continue,
             (Some(state), Some(filters)) => {
                 if !filters.contains(&tasktype(state)) {
                     continue;
@@ -517,7 +537,7 @@ pub fn list_tasks(
             continue;
         }
 
-        result.push(info.into());
+        result.push(into_task_list_item(info));
 
         if result.len() >= limit {
             break;
index e631920fb63a6073c2cf47dedce058d66b98f656..0240098d2052f57f7eaaa05327db45b934b74984 100644 (file)
@@ -13,8 +13,9 @@ use pbs_api_types::{
     DATASTORE_SCHEMA, REMOTE_ID_SCHEMA, REMOVE_VANISHED_BACKUPS_SCHEMA,
     PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_PRUNE, PRIV_REMOTE_READ,
 };
+use proxmox_rest_server::WorkerTask;
 
-use crate::server::{WorkerTask, jobstate::Job, pull::pull_store};
+use crate::server::{jobstate::Job, pull::pull_store};
 use crate::backup::DataStore;
 use pbs_config::CachedUserInfo;
 
index f7d79072df8d9a21991be440e81df97c1b4dd350..c85ec06992235bf0acdc67527bfa10253a7b72b7 100644 (file)
@@ -10,7 +10,7 @@ use pbs_api_types::Authid;
 use proxmox_rest_server::formatter::*;
 
 use crate::backup::DataStore;
-use crate::server::WorkerTask;
+use proxmox_rest_server::WorkerTask;
 
 //use proxmox::tools;
 
index fada952c5d2f5096cbaae3cf734184f2587370bf..c663e9ae4072638f0dc7b9e89786ac6b1a0cdb0e 100644 (file)
@@ -39,12 +39,12 @@ use pbs_datastore::backup_info::BackupDir;
 use pbs_datastore::index::IndexFile;
 use pbs_datastore::manifest::{archive_type, ArchiveType};
 use pbs_config::CachedUserInfo;
+use proxmox_rest_server::WorkerTask;
 
 use crate::{
     api2::helpers,
     backup::DataStore,
     server::{
-        WorkerTask,
         H2Service,
     },
 };
index fadbfa3d20245ba0f7a07a6445bc4900ebacd212..5effa99da372d933e7d096db848e05633ab2b027 100644 (file)
@@ -20,10 +20,11 @@ use pbs_api_types::{
     UPID_SCHEMA, JOB_ID_SCHEMA, PRIV_DATASTORE_READ, PRIV_TAPE_AUDIT, PRIV_TAPE_WRITE,
 };
 
-use pbs_datastore::{task_log, task_warn, StoreProgress};
+use pbs_datastore::StoreProgress;
 use pbs_datastore::backup_info::{BackupDir, BackupInfo};
-use pbs_datastore::task::TaskState;
+use pbs_tools::{task_log, task_warn, task::TaskState};
 use pbs_config::CachedUserInfo;
+use proxmox_rest_server::WorkerTask;
 
 use crate::{
     server::{
@@ -36,7 +37,6 @@ use crate::{
         },
     },
     backup::{DataStore, SnapshotReader},
-    server::WorkerTask,
     tape::{
         TAPE_STATUS_DIR,
         Inventory,
index 10aa6842260d504df874dcd931948f8bb52f5eb1..8227f659470bfea36a77cfbedc24a1784e69d790 100644 (file)
@@ -28,7 +28,6 @@ use pbs_api_types::{
     LtoDriveAndMediaStatus, Lp17VolumeStatistics,
 };
  
-use pbs_datastore::task_log;
 use pbs_api_types::{PRIV_TAPE_AUDIT, PRIV_TAPE_READ, PRIV_TAPE_WRITE};
 use pbs_config::CachedUserInfo;
 use pbs_tape::{
@@ -36,13 +35,14 @@ use pbs_tape::{
     sg_tape::tape_alert_flags_critical,
     linux_list_drives::{lto_tape_device_list, lookup_device_identification, open_lto_tape_device},
 };
+use pbs_tools::task_log;
+use proxmox_rest_server::WorkerTask;
 
 use crate::{
     api2::tape::restore::{
         fast_catalog_restore,
         restore_media,
     },
-    server::WorkerTask,
     tape::{
         TAPE_STATUS_DIR,
         Inventory,
index 7739d1a40365a0de032edb3807c6ad4cd61dd2d2..045d8d6ce66bbbcaf0fb1d5599598e7322f0e370 100644 (file)
@@ -34,26 +34,24 @@ use pbs_api_types::{
     UPID_SCHEMA, TAPE_RESTORE_SNAPSHOT_SCHEMA,
     PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_MODIFY, PRIV_TAPE_READ,
 };
-use pbs_datastore::{task_log, task_warn, DataBlob};
+use pbs_datastore::DataBlob;
 use pbs_datastore::backup_info::BackupDir;
 use pbs_datastore::dynamic_index::DynamicIndexReader;
 use pbs_datastore::fixed_index::FixedIndexReader;
 use pbs_datastore::index::IndexFile;
 use pbs_datastore::manifest::{archive_type, ArchiveType, BackupManifest, MANIFEST_BLOB_NAME};
-use pbs_datastore::task::TaskState;
 use pbs_config::CachedUserInfo;
 use pbs_tape::{
     TapeRead, BlockReadError, MediaContentHeader,
     PROXMOX_BACKUP_CONTENT_HEADER_MAGIC_1_0,
 };
+use pbs_tools::{task_log, task_warn, task::TaskState};
+use proxmox_rest_server::WorkerTask;
 
 use crate::{
     tools::ParallelHandler,
     backup::DataStore,
-    server::{
-        lookup_user_email,
-        WorkerTask,
-    },
+    server::lookup_user_email,
     tape::{
         TAPE_STATUS_DIR,
         MediaId,
index df8d46b67614b691ca071fd9c24e29a75d7d892f..fcef2d399408d3c520e8b0e01d2d3ed97887f2d9 100644 (file)
@@ -12,7 +12,6 @@ use lazy_static::lazy_static;
 use proxmox::tools::fs::{replace_file, file_read_optional_string, CreateOptions};
 
 use pbs_api_types::{UPID, DataStoreConfig, Authid, GarbageCollectionStatus};
-use pbs_datastore::{task_log, task_warn};
 use pbs_datastore::DataBlob;
 use pbs_datastore::backup_info::{BackupGroup, BackupDir};
 use pbs_datastore::chunk_store::ChunkStore;
@@ -24,10 +23,10 @@ use pbs_datastore::manifest::{
     ArchiveType, BackupManifest,
     archive_type,
 };
-use pbs_datastore::task::TaskState;
 use pbs_tools::format::HumanByte;
 use pbs_tools::fs::{lock_dir_noblock, DirLockGuard};
 use pbs_tools::process_locker::ProcessLockSharedGuard;
+use pbs_tools::{task_log, task_warn, task::TaskState};
 use pbs_config::{open_backup_lockfile, BackupLockGuard};
 use proxmox_rest_server::fail_on_shutdown;
 
index b8d2b2f3c0da58d7eab1fd7d410ae1434239126b..051d69186112f3d75515927d10dd9e2e68596e82 100644 (file)
@@ -7,12 +7,12 @@ use std::time::Instant;
 use anyhow::{bail, format_err, Error};
 
 use pbs_api_types::{Authid, CryptMode, VerifyState, UPID, SnapshotVerifyState};
-use pbs_datastore::{task_log, DataBlob, StoreProgress};
+use pbs_datastore::{DataBlob, StoreProgress};
 use pbs_datastore::backup_info::{BackupGroup, BackupDir, BackupInfo};
 use pbs_datastore::index::IndexFile;
 use pbs_datastore::manifest::{archive_type, ArchiveType, BackupManifest, FileInfo};
-use pbs_datastore::task::TaskState;
 use pbs_tools::fs::lock_dir_noblock_shared;
+use pbs_tools::{task_log, task::TaskState};
 
 use crate::{
     backup::DataStore,
index 9901b85d6efdfd5ac178f7e1db4c6e304e83045b..86650de6227b8f807c6ad55423aec42fd96650c8 100644 (file)
@@ -10,14 +10,9 @@ use proxmox::api::RpcEnvironmentType;
 use proxmox::tools::fs::CreateOptions;
 
 use pbs_tools::auth::private_auth_key;
-use proxmox_rest_server::{ApiConfig, RestServer};
-
-use proxmox_backup::server::{
-    self,
-    auth::default_api_auth,
-};
-use proxmox_rest_server::daemon;
+use proxmox_rest_server::{daemon, ApiConfig, RestServer};
 
+use proxmox_backup::server::auth::default_api_auth;
 use proxmox_backup::auth_helpers::*;
 use proxmox_backup::config;
 
@@ -86,7 +81,7 @@ async fn run() -> Result<(), Error> {
     )?;
 
     let backup_user = pbs_config::backup_user()?;
-    let mut commando_sock = proxmox_rest_server::CommandoSocket::new(crate::server::our_ctrl_sock(), backup_user.gid);
+    let mut commando_sock = proxmox_rest_server::CommandoSocket::new(proxmox_rest_server::our_ctrl_sock(), backup_user.gid);
 
     let dir_opts = CreateOptions::new().owner(backup_user.uid).group(backup_user.gid);
     let file_opts = CreateOptions::new().owner(backup_user.uid).group(backup_user.gid);
@@ -107,7 +102,7 @@ async fn run() -> Result<(), Error> {
 
 
     let rest_server = RestServer::new(config);
-    proxmox_backup::server::init_worker_tasks(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR_M!().into(), file_opts.clone())?;
+    proxmox_rest_server::init_worker_tasks(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR_M!().into(), file_opts.clone())?;
 
     // http server future:
     let server = daemon::create_daemon(
@@ -130,11 +125,11 @@ async fn run() -> Result<(), Error> {
         "proxmox-backup.service",
     );
 
-    server::write_pid(pbs_buildcfg::PROXMOX_BACKUP_API_PID_FN)?;
+    proxmox_rest_server::write_pid(pbs_buildcfg::PROXMOX_BACKUP_API_PID_FN)?;
     daemon::systemd_notify(daemon::SystemdNotify::Ready)?;
 
     let init_result: Result<(), Error> = try_block!({
-        server::register_task_control_commands(&mut commando_sock)?;
+        proxmox_rest_server::register_task_control_commands(&mut commando_sock)?;
         commando_sock.spawn()?;
         proxmox_rest_server::server_state_init()?;
         Ok(())
index 689f44db156e43e590542e20fb0bdb387f28f91d..b9e4e2ffe9c6d8c2abad21ac762cf20a1237e19b 100644 (file)
@@ -14,9 +14,10 @@ use pbs_api_types::{
     IGNORE_VERIFIED_BACKUPS_SCHEMA, VERIFICATION_OUTDATED_AFTER_SCHEMA,
 };
 
+use proxmox_rest_server::wait_for_local_worker;
+
 use proxmox_backup::config;
 use proxmox_backup::api2;
-use proxmox_backup::server::wait_for_local_worker;
 
 mod proxmox_backup_manager;
 use proxmox_backup_manager::*;
index 5d8ed189f4a9abfc98b1dfb014262ba3a5bc3328..ec4da15b75379d0a7683169044368481369dc16e 100644 (file)
@@ -19,18 +19,16 @@ use proxmox::api::RpcEnvironmentType;
 use proxmox::sys::linux::socket::set_tcp_keepalive;
 use proxmox::tools::fs::CreateOptions;
 
-use proxmox_rest_server::{ApiConfig, RestServer};
+use proxmox_rest_server::{rotate_task_log_archive, ApiConfig, RestServer, WorkerTask};
 
 use proxmox_backup::{
     backup::DataStore,
     server::{
         auth::default_api_auth,
-        WorkerTask,
         jobstate::{
             self,
             Job,
         },
-        rotate_task_log_archive,
     },
 };
 
@@ -188,7 +186,7 @@ async fn run() -> Result<(), Error> {
     config.register_template("console", "/usr/share/pve-xtermjs/index.html.hbs")?;
 
     let backup_user = pbs_config::backup_user()?;
-    let mut commando_sock = proxmox_rest_server::CommandoSocket::new(crate::server::our_ctrl_sock(), backup_user.gid);
+    let mut commando_sock = proxmox_rest_server::CommandoSocket::new(proxmox_rest_server::our_ctrl_sock(), backup_user.gid);
 
     let dir_opts = CreateOptions::new().owner(backup_user.uid).group(backup_user.gid);
     let file_opts = CreateOptions::new().owner(backup_user.uid).group(backup_user.gid);
@@ -208,7 +206,7 @@ async fn run() -> Result<(), Error> {
     )?;
 
     let rest_server = RestServer::new(config);
-    proxmox_backup::server::init_worker_tasks(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR_M!().into(), file_opts.clone())?;
+    proxmox_rest_server::init_worker_tasks(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR_M!().into(), file_opts.clone())?;
 
     //openssl req -x509 -newkey rsa:4096 -keyout /etc/proxmox-backup/proxy.key -out /etc/proxmox-backup/proxy.pem -nodes
 
@@ -266,11 +264,11 @@ async fn run() -> Result<(), Error> {
         "proxmox-backup-proxy.service",
     );
 
-    server::write_pid(pbs_buildcfg::PROXMOX_BACKUP_PROXY_PID_FN)?;
+    proxmox_rest_server::write_pid(pbs_buildcfg::PROXMOX_BACKUP_PROXY_PID_FN)?;
     daemon::systemd_notify(daemon::SystemdNotify::Ready)?;
 
     let init_result: Result<(), Error> = try_block!({
-        server::register_task_control_commands(&mut commando_sock)?;
+        proxmox_rest_server::register_task_control_commands(&mut commando_sock)?;
         commando_sock.spawn()?;
         proxmox_rest_server::server_state_init()?;
         Ok(())
@@ -806,11 +804,11 @@ async fn schedule_task_log_rotate() {
 async fn command_reopen_access_logfiles() -> Result<(), Error> {
     // only care about the most recent daemon instance for each, proxy & api, as other older ones
     // should not respond to new requests anyway, but only finish their current one and then exit.
-    let sock = crate::server::our_ctrl_sock();
+    let sock = proxmox_rest_server::our_ctrl_sock();
     let f1 = proxmox_rest_server::send_command(sock, "{\"command\":\"api-access-log-reopen\"}\n");
 
-    let pid = crate::server::read_pid(pbs_buildcfg::PROXMOX_BACKUP_API_PID_FN)?;
-    let sock = crate::server::ctrl_sock_from_pid(pid);
+    let pid = proxmox_rest_server::read_pid(pbs_buildcfg::PROXMOX_BACKUP_API_PID_FN)?;
+    let sock = proxmox_rest_server::ctrl_sock_from_pid(pid);
     let f2 = proxmox_rest_server::send_command(sock, "{\"command\":\"api-access-log-reopen\"}\n");
 
     match futures::join!(f1, f2) {
@@ -824,11 +822,11 @@ async fn command_reopen_access_logfiles() -> Result<(), Error> {
 async fn command_reopen_auth_logfiles() -> Result<(), Error> {
     // only care about the most recent daemon instance for each, proxy & api, as other older ones
     // should not respond to new requests anyway, but only finish their current one and then exit.
-    let sock = crate::server::our_ctrl_sock();
+    let sock = proxmox_rest_server::our_ctrl_sock();
     let f1 = proxmox_rest_server::send_command(sock, "{\"command\":\"api-auth-log-reopen\"}\n");
 
-    let pid = crate::server::read_pid(pbs_buildcfg::PROXMOX_BACKUP_API_PID_FN)?;
-    let sock = crate::server::ctrl_sock_from_pid(pid);
+    let pid = proxmox_rest_server::read_pid(pbs_buildcfg::PROXMOX_BACKUP_API_PID_FN)?;
+    let sock = proxmox_rest_server::ctrl_sock_from_pid(pid);
     let f2 = proxmox_rest_server::send_command(sock, "{\"command\":\"api-auth-log-reopen\"}\n");
 
     match futures::join!(f1, f2) {
index c1580b97bab7b73d0e7852f6121e035bbaa6fe12..09a768b12f48d8428b4788c9840b6e3487b7eb72 100644 (file)
@@ -11,7 +11,7 @@ async fn wait_for_local_worker(upid_str: &str) -> Result<(), Error> {
     let sleep_duration = core::time::Duration::new(0, 100_000_000);
 
     loop {
-        if !proxmox_backup::server::worker_is_active_local(&upid) {
+        if !proxmox_rest_server::worker_is_active_local(&upid) {
             break;
         }
         tokio::time::sleep(sleep_duration).await;
index bebe9ddc068505a8b5b9951d2961912eee86b019..003f6677c273f0227f05fe73cca5cccda10d7fbd 100644 (file)
@@ -235,12 +235,12 @@ async fn handle_worker(upid_str: &str) -> Result<(), Error> {
     let abort_future = async move {
         while signal_stream.recv().await.is_some() {
             println!("got shutdown request (SIGINT)");
-            proxmox_backup::server::abort_local_worker(upid.clone());
+            proxmox_rest_server::abort_local_worker(upid.clone());
         }
         Ok::<_, Error>(())
     };
 
-    let result_future = proxmox_backup::server::wait_for_local_worker(upid_str);
+    let result_future = proxmox_rest_server::wait_for_local_worker(upid_str);
 
     futures::select! {
         result = result_future.fuse() => result?,
index 317f4a36cebc125c10bb887befb98e0e10ba4d79..608b58317c9afd727b2eab658ea2ddb0e36240c8 100644 (file)
@@ -2,9 +2,9 @@ use std::sync::Arc;
 use anyhow::Error;
 
 use pbs_api_types::Authid;
+use proxmox_rest_server::WorkerTask;
 
 use crate::{
-    server::WorkerTask,
     server::jobstate::Job,
     backup::DataStore,
 };
index 41d628be713c741cccf68b8fe202c8796cbd0626..0b51a7107f225cff2674b42748d54b4248828d46 100644 (file)
@@ -11,11 +11,9 @@ use hyper::{Body, Request, Response, StatusCode};
 use proxmox::api::{ApiResponseFuture, HttpError, Router, RpcEnvironment};
 use proxmox::http_err;
 
-use proxmox_rest_server::normalize_uri_path;
+use proxmox_rest_server::{normalize_uri_path, WorkerTask};
 use proxmox_rest_server::formatter::*;
 
-use crate::server::WorkerTask;
-
 /// Hyper Service implementation to handle stateful H2 connections.
 ///
 /// We use this kind of service to handle backup protocol
index 74224f33df4b0b790f17be98fa348b9d54e17be4..64feb8780440476bedae222acd7b34f7e391fd9d 100644 (file)
@@ -14,7 +14,7 @@
 //! an example usage would be
 //! ```no_run
 //! # use anyhow::{bail, Error};
-//! # use proxmox_backup::server::TaskState;
+//! # use proxmox_rest_server::TaskState;
 //! # use proxmox_backup::server::jobstate::*;
 //! # fn some_code() -> TaskState { TaskState::OK { endtime: 0 } }
 //! # fn code() -> Result<(), Error> {
@@ -50,11 +50,7 @@ use proxmox_systemd::time::{compute_next_event, parse_calendar_event};
 use pbs_config::{open_backup_lockfile, BackupLockGuard};
 use pbs_api_types::{UPID, JobScheduleStatus};
 
-use crate::server::{
-    TaskState,
-    upid_read_status,
-    worker_is_active_local,
-};
+use proxmox_rest_server::{upid_read_status, worker_is_active_local, TaskState};
 
 #[derive(Serialize, Deserialize)]
 #[serde(rename_all = "kebab-case")]
index 77320da6d789f1efd1e47ae7dabcdc9d409b99fb..96d57bd4e232a1182fef7adb8f159386e5cb62eb 100644 (file)
@@ -4,51 +4,13 @@
 //! services. We want async IO, so this is built on top of
 //! tokio/hyper.
 
-use anyhow::{format_err, Error};
-use lazy_static::lazy_static;
-use nix::unistd::Pid;
+use anyhow::Error;
 use serde_json::Value;
 
-use proxmox::sys::linux::procfs::PidStat;
 use proxmox::tools::fs::{create_path, CreateOptions};
 
 use pbs_buildcfg;
 
-lazy_static! {
-    static ref PID: i32 = unsafe { libc::getpid() };
-    static ref PSTART: u64 = PidStat::read_from_pid(Pid::from_raw(*PID)).unwrap().starttime;
-}
-
-pub fn pid() -> i32 {
-    *PID
-}
-
-pub fn pstart() -> u64 {
-    *PSTART
-}
-
-pub fn write_pid(pid_fn: &str) -> Result<(), Error> {
-    let pid_str = format!("{}\n", *PID);
-    proxmox::tools::fs::replace_file(pid_fn, pid_str.as_bytes(), CreateOptions::new())
-}
-
-pub fn read_pid(pid_fn: &str) -> Result<i32, Error> {
-    let pid = proxmox::tools::fs::file_get_contents(pid_fn)?;
-    let pid = std::str::from_utf8(&pid)?.trim();
-    pid.parse().map_err(|err| format_err!("could not parse pid - {}", err))
-}
-
-pub fn ctrl_sock_from_pid(pid: i32) -> String {
-    format!("\0{}/control-{}.sock", pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR, pid)
-}
-
-pub fn our_ctrl_sock() -> String {
-    ctrl_sock_from_pid(*PID)
-}
-
-mod worker_task;
-pub use worker_task::*;
-
 mod h2service;
 pub use h2service::*;
 
@@ -76,16 +38,16 @@ pub mod auth;
 pub mod pull;
 
 pub(crate) async fn reload_proxy_certificate() -> Result<(), Error> {
-    let proxy_pid = crate::server::read_pid(pbs_buildcfg::PROXMOX_BACKUP_PROXY_PID_FN)?;
-    let sock = crate::server::ctrl_sock_from_pid(proxy_pid);
+    let proxy_pid = proxmox_rest_server::read_pid(pbs_buildcfg::PROXMOX_BACKUP_PROXY_PID_FN)?;
+    let sock = proxmox_rest_server::ctrl_sock_from_pid(proxy_pid);
     let _: Value = proxmox_rest_server::send_raw_command(sock, "{\"command\":\"reload-certificate\"}\n")
         .await?;
     Ok(())
 }
 
 pub(crate) async fn notify_datastore_removed() -> Result<(), Error> {
-    let proxy_pid = crate::server::read_pid(pbs_buildcfg::PROXMOX_BACKUP_PROXY_PID_FN)?;
-    let sock = crate::server::ctrl_sock_from_pid(proxy_pid);
+    let proxy_pid = proxmox_rest_server::read_pid(pbs_buildcfg::PROXMOX_BACKUP_PROXY_PID_FN)?;
+    let sock = proxmox_rest_server::ctrl_sock_from_pid(proxy_pid);
     let _: Value = proxmox_rest_server::send_raw_command(sock, "{\"command\":\"datastore-removed\"}\n")
         .await?;
     Ok(())
index 8d971a1c981319c2d1c8acd446f45fe13cd2e07d..537401872d24f7439a82c0fa03796e358aaa3cb9 100644 (file)
@@ -2,17 +2,17 @@ use std::sync::Arc;
 
 use anyhow::Error;
 
-use pbs_datastore::{task_log, task_warn};
 use pbs_datastore::backup_info::BackupInfo;
 use pbs_datastore::prune::compute_prune_info;
 use pbs_api_types::{Authid, PRIV_DATASTORE_MODIFY, PruneOptions};
 use pbs_config::CachedUserInfo;
+use pbs_tools::{task_log, task_warn};
+use proxmox_rest_server::WorkerTask;
 
 use crate::{
     backup::DataStore,
     server::jobstate::Job,
-    server::WorkerTask,
-};
+ };
 
 pub fn prune_datastore(
     worker: Arc<WorkerTask>,
index 5214a218f1524cd980bd521fbf88cdd0118007be..f913ac8ac4c1301a1a105805ee0125c0b090a6ad 100644 (file)
@@ -13,7 +13,7 @@ use serde_json::json;
 use proxmox::api::error::{HttpError, StatusCode};
 
 use pbs_api_types::{Authid, SnapshotListItem, GroupListItem};
-use pbs_datastore::{task_log, BackupInfo, BackupDir, BackupGroup, StoreProgress};
+use pbs_datastore::{BackupInfo, BackupDir, BackupGroup, StoreProgress};
 use pbs_datastore::data_blob::DataBlob;
 use pbs_datastore::dynamic_index::DynamicIndexReader;
 use pbs_datastore::fixed_index::FixedIndexReader;
@@ -22,11 +22,12 @@ use pbs_datastore::manifest::{
     CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME, ArchiveType, BackupManifest, FileInfo, archive_type
 };
 use pbs_tools::sha::sha256;
+use pbs_tools::task_log;
 use pbs_client::{BackupReader, BackupRepository, HttpClient, HttpClientOptions, RemoteChunkReader};
+use proxmox_rest_server::WorkerTask;
 
 use crate::{
     backup::DataStore,
-    server::WorkerTask,
     tools::ParallelHandler,
 };
 
index 6005b706e21b30296a3e9e04cc1c2bf23a68205f..62fa6fa889b7e4b4d0886884b3b64033d094cee0 100644 (file)
@@ -1,10 +1,10 @@
 use anyhow::{format_err, Error};
 
-use pbs_datastore::task_log;
+use pbs_tools::task_log;
 use pbs_api_types::{Authid, VerificationJobConfig};
+use proxmox_rest_server::WorkerTask;
 
 use crate::{
-    server::WorkerTask,
     server::jobstate::Job,
     backup::{
         DataStore,
diff --git a/src/server/worker_task.rs b/src/server/worker_task.rs
deleted file mode 100644 (file)
index 94ffbeb..0000000
+++ /dev/null
@@ -1,923 +0,0 @@
-use std::collections::{HashMap, VecDeque};
-use std::fs::File;
-use std::path::PathBuf;
-use std::io::{Read, Write, BufRead, BufReader};
-use std::panic::UnwindSafe;
-use std::sync::atomic::{AtomicBool, Ordering};
-use std::sync::{Arc, Mutex};
-
-use anyhow::{bail, format_err, Error};
-use futures::*;
-use lazy_static::lazy_static;
-use serde_json::{json, Value};
-use serde::{Serialize, Deserialize};
-use tokio::sync::oneshot;
-use nix::fcntl::OFlag;
-use once_cell::sync::OnceCell;
-
-use proxmox::sys::linux::procfs;
-use proxmox::try_block;
-use proxmox::tools::fs::{create_path, replace_file, atomic_open_or_create_file, CreateOptions};
-use proxmox::api::upid::UPID;
-
-use pbs_tools::logrotate::{LogRotate, LogRotateFiles};
-use proxmox_rest_server::{CommandoSocket, FileLogger, FileLogOptions};
-
-struct TaskListLockGuard(File);
-
-struct WorkerTaskSetup {
-    file_opts: CreateOptions,
-    taskdir: PathBuf,
-    task_lock_fn: PathBuf,
-    active_tasks_fn: PathBuf,
-    task_index_fn: PathBuf,
-    task_archive_fn: PathBuf,
-}
-
-static WORKER_TASK_SETUP: OnceCell<WorkerTaskSetup> = OnceCell::new();
-
-fn worker_task_setup() -> Result<&'static WorkerTaskSetup, Error> {
-    WORKER_TASK_SETUP.get()
-        .ok_or_else(|| format_err!("WorkerTask library is not initialized"))
-}
-
-impl WorkerTaskSetup {
-
-    fn new(basedir: PathBuf, file_opts: CreateOptions) -> Self {
-
-        let mut taskdir = basedir.clone();
-        taskdir.push("tasks");
-
-        let mut task_lock_fn = taskdir.clone();
-        task_lock_fn.push(".active.lock");
-
-        let mut active_tasks_fn = taskdir.clone();
-        active_tasks_fn.push("active");
-
-        let mut task_index_fn = taskdir.clone();
-        task_index_fn.push("index");
-
-        let mut task_archive_fn = taskdir.clone();
-        task_archive_fn.push("archive");
-
-        Self {
-            file_opts,
-            taskdir,
-            task_lock_fn,
-            active_tasks_fn,
-            task_index_fn,
-            task_archive_fn,
-        }
-    }
-
-    fn lock_task_list_files(&self, exclusive: bool) -> Result<TaskListLockGuard, Error> {
-        let options =  self.file_opts.clone()
-            .perm(nix::sys::stat::Mode::from_bits_truncate(0o660));
-
-        let timeout = std::time::Duration::new(10, 0);
-
-        let file = proxmox::tools::fs::open_file_locked(
-            &self.task_lock_fn,
-            timeout,
-            exclusive,
-            options,
-        )?;
-
-        Ok(TaskListLockGuard(file))
-    }
-
-    fn log_path(&self, upid: &UPID) -> std::path::PathBuf {
-        let mut path = self.taskdir.clone();
-        path.push(format!("{:02X}", upid.pstart % 256));
-        path.push(upid.to_string());
-        path
-    }
-
-    // atomically read/update the task list, update status of finished tasks
-    // new_upid is added to the list when specified.
-    fn update_active_workers(&self, new_upid: Option<&UPID>) -> Result<(), Error> {
-
-        let lock = self.lock_task_list_files(true)?;
-
-        // TODO remove with 1.x
-        let mut finish_list: Vec<TaskListInfo> = read_task_file_from_path(&self.task_index_fn)?;
-        let had_index_file = !finish_list.is_empty();
-
-        // We use filter_map because one negative case wants to *move* the data into `finish_list`,
-        // clippy doesn't quite catch this!
-        #[allow(clippy::unnecessary_filter_map)]
-        let mut active_list: Vec<TaskListInfo> = read_task_file_from_path(&self.active_tasks_fn)?
-            .into_iter()
-            .filter_map(|info| {
-                if info.state.is_some() {
-                    // this can happen when the active file still includes finished tasks
-                    finish_list.push(info);
-                    return None;
-                }
-
-                if !worker_is_active_local(&info.upid) {
-                    // println!("Detected stopped task '{}'", &info.upid_str);
-                    let now = proxmox::tools::time::epoch_i64();
-                    let status = upid_read_status(&info.upid).unwrap_or(TaskState::Unknown { endtime: now });
-                    finish_list.push(TaskListInfo {
-                        upid: info.upid,
-                        upid_str: info.upid_str,
-                        state: Some(status)
-                    });
-                    return None;
-                }
-
-                Some(info)
-            }).collect();
-
-        if let Some(upid) = new_upid {
-            active_list.push(TaskListInfo { upid: upid.clone(), upid_str: upid.to_string(), state: None });
-        }
-
-        let active_raw = render_task_list(&active_list);
-
-        let options =  self.file_opts.clone()
-            .perm(nix::sys::stat::Mode::from_bits_truncate(0o660));
-
-        replace_file(
-            &self.active_tasks_fn,
-            active_raw.as_bytes(),
-            options,
-        )?;
-
-        finish_list.sort_unstable_by(|a, b| {
-            match (&a.state, &b.state) {
-                (Some(s1), Some(s2)) => s1.cmp(&s2),
-                (Some(_), None) => std::cmp::Ordering::Less,
-                (None, Some(_)) => std::cmp::Ordering::Greater,
-                _ => a.upid.starttime.cmp(&b.upid.starttime),
-            }
-        });
-
-        if !finish_list.is_empty() {
-            let options =  self.file_opts.clone()
-                .perm(nix::sys::stat::Mode::from_bits_truncate(0o660));
-
-            let mut writer = atomic_open_or_create_file(
-                &self.task_archive_fn,
-                OFlag::O_APPEND | OFlag::O_RDWR,
-                &[],
-                options,
-            )?;
-            for info in &finish_list {
-                writer.write_all(render_task_line(&info).as_bytes())?;
-            }
-        }
-
-        // TODO Remove with 1.x
-        // for compatibility, if we had an INDEX file, we do not need it anymore
-        if had_index_file {
-            let _ = nix::unistd::unlink(&self.task_index_fn);
-        }
-
-        drop(lock);
-
-        Ok(())
-    }
-
-    // Create task log directory with correct permissions
-    fn create_task_log_dirs(&self) -> Result<(), Error> {
-
-        try_block!({
-            let dir_opts = self.file_opts.clone()
-                .perm(nix::sys::stat::Mode::from_bits_truncate(0o755));
-
-            create_path(&self.taskdir, Some(dir_opts.clone()), Some(dir_opts.clone()))?;
-            // fixme:??? create_path(pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR, None, Some(opts))?;
-            Ok(())
-        }).map_err(|err: Error| format_err!("unable to create task log dir - {}", err))
-    }
-}
-
-/// Initialize the WorkerTask library
-pub fn init_worker_tasks(basedir: PathBuf, file_opts: CreateOptions) -> Result<(), Error> {
-    let setup = WorkerTaskSetup::new(basedir, file_opts);
-    setup.create_task_log_dirs()?;
-    WORKER_TASK_SETUP.set(setup)
-        .map_err(|_| format_err!("init_worker_tasks failed - already initialized"))
-}
-
-/// checks if the Task Archive is bigger that 'size_threshold' bytes, and
-/// rotates it if it is
-pub fn rotate_task_log_archive(size_threshold: u64, compress: bool, max_files: Option<usize>) -> Result<bool, Error> {
-
-    let setup = worker_task_setup()?;
-
-    let _lock = setup.lock_task_list_files(true)?;
-
-    let mut logrotate = LogRotate::new(&setup.task_archive_fn, compress)
-            .ok_or_else(|| format_err!("could not get archive file names"))?;
-
-    logrotate.rotate(size_threshold, None, max_files)
-}
-
-
-/// Path to the worker log file
-pub fn upid_log_path(upid: &UPID) -> Result<std::path::PathBuf, Error> {
-    let setup = worker_task_setup()?;
-    Ok(setup.log_path(upid))
-}
-
-/// Read endtime (time of last log line) and exitstatus from task log file
-/// If there is not a single line with at valid datetime, we assume the
-/// starttime to be the endtime
-pub fn upid_read_status(upid: &UPID) -> Result<TaskState, Error> {
-
-    let setup = worker_task_setup()?;
-
-    let mut status = TaskState::Unknown { endtime: upid.starttime };
-
-    let path = setup.log_path(upid);
-
-    let mut file = File::open(path)?;
-
-    /// speedup - only read tail
-    use std::io::Seek;
-    use std::io::SeekFrom;
-    let _ = file.seek(SeekFrom::End(-8192)); // ignore errors
-
-    let mut data = Vec::with_capacity(8192);
-    file.read_to_end(&mut data)?;
-
-    // strip newlines at the end of the task logs
-    while data.last() == Some(&b'\n') {
-        data.pop();
-    }
-
-    let last_line = match data.iter().rposition(|c| *c == b'\n') {
-        Some(start) if data.len() > (start+1) => &data[start+1..],
-        Some(_) => &data, // should not happen, since we removed all trailing newlines
-        None => &data,
-    };
-
-    let last_line = std::str::from_utf8(last_line)
-        .map_err(|err| format_err!("upid_read_status: utf8 parse failed: {}", err))?;
-
-    let mut iter = last_line.splitn(2, ": ");
-    if let Some(time_str) = iter.next() {
-        if let Ok(endtime) = proxmox::tools::time::parse_rfc3339(time_str) {
-            // set the endtime even if we cannot parse the state
-            status = TaskState::Unknown { endtime };
-            if let Some(rest) = iter.next().and_then(|rest| rest.strip_prefix("TASK ")) {
-                if let Ok(state) = TaskState::from_endtime_and_message(endtime, rest) {
-                    status = state;
-                }
-            }
-        }
-    }
-
-    Ok(status)
-}
-
-lazy_static! {
-    static ref WORKER_TASK_LIST: Mutex<HashMap<usize, Arc<WorkerTask>>> = Mutex::new(HashMap::new());
-}
-
-/// checks if the task UPID refers to a worker from this process
-fn is_local_worker(upid: &UPID) -> bool {
-    upid.pid == crate::server::pid() && upid.pstart == crate::server::pstart()
-}
-
-/// Test if the task is still running
-pub async fn worker_is_active(upid: &UPID) -> Result<bool, Error> {
-    if is_local_worker(upid) {
-        return Ok(WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id));
-    }
-
-    if procfs::check_process_running_pstart(upid.pid, upid.pstart).is_none() {
-        return Ok(false);
-    }
-
-    let sock = crate::server::ctrl_sock_from_pid(upid.pid);
-    let cmd = json!({
-        "command": "worker-task-status",
-        "args": {
-            "upid": upid.to_string(),
-        },
-    });
-    let status = proxmox_rest_server::send_command(sock, &cmd).await?;
-
-    if let Some(active) = status.as_bool() {
-        Ok(active)
-    } else {
-        bail!("got unexpected result {:?} (expected bool)", status);
-    }
-}
-
-/// Test if the task is still running (fast but inaccurate implementation)
-///
-/// If the task is spawned from a different process, we simply return if
-/// that process is still running. This information is good enough to detect
-/// stale tasks...
-pub fn worker_is_active_local(upid: &UPID) -> bool {
-    if is_local_worker(upid) {
-        WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id)
-    } else {
-        procfs::check_process_running_pstart(upid.pid, upid.pstart).is_some()
-    }
-}
-
-pub fn register_task_control_commands(
-    commando_sock: &mut CommandoSocket,
-) -> Result<(), Error> {
-    fn get_upid(args: Option<&Value>) -> Result<UPID, Error> {
-        let args = if let Some(args) = args { args } else { bail!("missing args") };
-        let upid = match args.get("upid") {
-            Some(Value::String(upid)) => upid.parse::<UPID>()?,
-            None => bail!("no upid in args"),
-            _ => bail!("unable to parse upid"),
-        };
-        if !is_local_worker(&upid) {
-            bail!("upid does not belong to this process");
-        }
-        Ok(upid)
-    }
-
-    commando_sock.register_command("worker-task-abort".into(), move |args| {
-        let upid = get_upid(args)?;
-
-        abort_local_worker(upid);
-
-        Ok(Value::Null)
-    })?;
-    commando_sock.register_command("worker-task-status".into(), move |args| {
-        let upid = get_upid(args)?;
-
-        let active = WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id);
-
-        Ok(active.into())
-    })?;
-
-    Ok(())
-}
-
-pub fn abort_worker_async(upid: UPID) {
-    tokio::spawn(async move {
-        if let Err(err) = abort_worker(upid).await {
-            eprintln!("abort worker failed - {}", err);
-        }
-    });
-}
-
-pub async fn abort_worker(upid: UPID) -> Result<(), Error> {
-
-    let sock = crate::server::ctrl_sock_from_pid(upid.pid);
-    let cmd = json!({
-        "command": "worker-task-abort",
-        "args": {
-            "upid": upid.to_string(),
-        },
-    });
-    proxmox_rest_server::send_command(sock, &cmd).map_ok(|_| ()).await
-}
-
-fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<TaskState>), Error> {
-
-    let data = line.splitn(3, ' ').collect::<Vec<&str>>();
-
-    let len = data.len();
-
-    match len {
-        1 => Ok((data[0].to_owned(), data[0].parse::<UPID>()?, None)),
-        3 => {
-            let endtime = i64::from_str_radix(data[1], 16)?;
-            let state = TaskState::from_endtime_and_message(endtime, data[2])?;
-            Ok((data[0].to_owned(), data[0].parse::<UPID>()?, Some(state)))
-        }
-        _ => bail!("wrong number of components"),
-    }
-}
-
-/// Task State
-#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
-pub enum TaskState {
-    /// The Task ended with an undefined state
-    Unknown { endtime: i64 },
-    /// The Task ended and there were no errors or warnings
-    OK { endtime: i64 },
-    /// The Task had 'count' amount of warnings and no errors
-    Warning { count: u64, endtime: i64 },
-    /// The Task ended with the error described in 'message'
-    Error { message: String, endtime: i64 },
-}
-
-impl TaskState {
-    pub fn endtime(&self) -> i64 {
-        match *self {
-            TaskState::Unknown { endtime } => endtime,
-            TaskState::OK { endtime } => endtime,
-            TaskState::Warning { endtime, .. } => endtime,
-            TaskState::Error { endtime, .. } => endtime,
-        }
-    }
-
-    fn result_text(&self) -> String {
-        match self {
-            TaskState::Error { message, .. } => format!("TASK ERROR: {}", message),
-            other => format!("TASK {}", other),
-        }
-    }
-
-    fn from_endtime_and_message(endtime: i64, s: &str) -> Result<Self, Error> {
-        if s == "unknown" {
-            Ok(TaskState::Unknown { endtime })
-        } else if s == "OK" {
-            Ok(TaskState::OK { endtime })
-        } else if let Some(warnings) = s.strip_prefix("WARNINGS: ") {
-            let count: u64 = warnings.parse()?;
-            Ok(TaskState::Warning{ count, endtime })
-        } else if !s.is_empty() {
-            let message = if let Some(err) = s.strip_prefix("ERROR: ") { err } else { s }.to_string();
-            Ok(TaskState::Error{ message, endtime })
-        } else {
-            bail!("unable to parse Task Status '{}'", s);
-        }
-    }
-}
-
-impl std::cmp::PartialOrd for TaskState {
-    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
-        Some(self.endtime().cmp(&other.endtime()))
-    }
-}
-
-impl std::cmp::Ord for TaskState {
-    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
-        self.endtime().cmp(&other.endtime())
-    }
-}
-
-impl std::fmt::Display for TaskState {
-    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
-        match self {
-            TaskState::Unknown { .. } => write!(f, "unknown"),
-            TaskState::OK { .. }=> write!(f, "OK"),
-            TaskState::Warning { count, .. } => write!(f, "WARNINGS: {}", count),
-            TaskState::Error { message, .. } => write!(f, "{}", message),
-        }
-    }
-}
-
-/// Task details including parsed UPID
-///
-/// If there is no `state`, the task is still running.
-#[derive(Debug)]
-pub struct TaskListInfo {
-    /// The parsed UPID
-    pub upid: UPID,
-    /// UPID string representation
-    pub upid_str: String,
-    /// Task `(endtime, status)` if already finished
-    pub state: Option<TaskState>, // endtime, status
-}
-
-impl Into<pbs_api_types::TaskListItem> for TaskListInfo {
-    fn into(self) -> pbs_api_types::TaskListItem {
-        let (endtime, status) = self
-            .state
-            .map_or_else(|| (None, None), |a| (Some(a.endtime()), Some(a.to_string())));
-
-        pbs_api_types::TaskListItem {
-            upid: self.upid_str,
-            node: "localhost".to_string(),
-            pid: self.upid.pid as i64,
-            pstart: self.upid.pstart,
-            starttime: self.upid.starttime,
-            worker_type: self.upid.worker_type,
-            worker_id: self.upid.worker_id,
-            user: self.upid.auth_id,
-            endtime,
-            status,
-        }
-    }
-}
-
-fn render_task_line(info: &TaskListInfo) -> String {
-    let mut raw = String::new();
-    if let Some(status) = &info.state {
-        raw.push_str(&format!("{} {:08X} {}\n", info.upid_str, status.endtime(), status));
-    } else {
-        raw.push_str(&info.upid_str);
-        raw.push('\n');
-    }
-
-    raw
-}
-
-fn render_task_list(list: &[TaskListInfo]) -> String {
-    let mut raw = String::new();
-    for info in list {
-        raw.push_str(&render_task_line(&info));
-    }
-    raw
-}
-
-// note this is not locked, caller has to make sure it is
-// this will skip (and log) lines that are not valid status lines
-fn read_task_file<R: Read>(reader: R) -> Result<Vec<TaskListInfo>, Error>
-{
-    let reader = BufReader::new(reader);
-    let mut list = Vec::new();
-    for line in reader.lines() {
-        let line = line?;
-        match parse_worker_status_line(&line) {
-            Ok((upid_str, upid, state)) => list.push(TaskListInfo {
-                upid_str,
-                upid,
-                state
-            }),
-            Err(err) => {
-                eprintln!("unable to parse worker status '{}' - {}", line, err);
-                continue;
-            }
-        };
-    }
-
-    Ok(list)
-}
-
-// note this is not locked, caller has to make sure it is
-fn read_task_file_from_path<P>(path: P) -> Result<Vec<TaskListInfo>, Error>
-where
-    P: AsRef<std::path::Path> + std::fmt::Debug,
-{
-    let file = match File::open(&path) {
-        Ok(f) => f,
-        Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
-        Err(err) => bail!("unable to open task list {:?} - {}", path, err),
-    };
-
-    read_task_file(file)
-}
-
-pub struct TaskListInfoIterator {
-    list: VecDeque<TaskListInfo>,
-    end: bool,
-    archive: Option<LogRotateFiles>,
-    lock: Option<TaskListLockGuard>,
-}
-
-impl TaskListInfoIterator {
-    pub fn new(active_only: bool) -> Result<Self, Error> {
-
-        let setup = worker_task_setup()?;
-
-        let (read_lock, active_list) = {
-            let lock = setup.lock_task_list_files(false)?;
-            let active_list = read_task_file_from_path(&setup.active_tasks_fn)?;
-
-            let needs_update = active_list
-                .iter()
-                .any(|info| info.state.is_some() || !worker_is_active_local(&info.upid));
-
-            // TODO remove with 1.x
-            let index_exists = setup.task_index_fn.is_file();
-
-            if needs_update || index_exists {
-                drop(lock);
-                setup.update_active_workers(None)?;
-                let lock = setup.lock_task_list_files(false)?;
-                let active_list = read_task_file_from_path(&setup.active_tasks_fn)?;
-                (lock, active_list)
-            } else {
-                (lock, active_list)
-            }
-        };
-
-        let archive = if active_only {
-            None
-        } else {
-            let logrotate = LogRotate::new(&setup.task_archive_fn, true)
-                .ok_or_else(|| format_err!("could not get archive file names"))?;
-            Some(logrotate.files())
-        };
-
-        let lock = if active_only { None } else { Some(read_lock) };
-
-        Ok(Self {
-            list: active_list.into(),
-            end: active_only,
-            archive,
-            lock,
-        })
-    }
-}
-
-impl Iterator for TaskListInfoIterator {
-    type Item = Result<TaskListInfo, Error>;
-
-    fn next(&mut self) -> Option<Self::Item> {
-        loop {
-            if let Some(element) = self.list.pop_back() {
-                return Some(Ok(element));
-            } else if self.end {
-                    return None;
-            } else {
-                if let Some(mut archive) = self.archive.take() {
-                    if let Some(file) = archive.next() {
-                        let list = match read_task_file(file) {
-                            Ok(list) => list,
-                            Err(err) => return Some(Err(err)),
-                        };
-                        self.list.append(&mut list.into());
-                        self.archive = Some(archive);
-                        continue;
-                    }
-                }
-
-                self.end = true;
-                self.lock.take();
-            }
-        }
-    }
-}
-
-/// Launch long running worker tasks.
-///
-/// A worker task can either be a whole thread, or a simply tokio
-/// task/future. Each task can `log()` messages, which are stored
-/// persistently to files. Task should poll the `abort_requested`
-/// flag, and stop execution when requested.
-pub struct WorkerTask {
-    setup: &'static WorkerTaskSetup,
-    upid: UPID,
-    data: Mutex<WorkerTaskData>,
-    abort_requested: AtomicBool,
-}
-
-impl std::fmt::Display for WorkerTask {
-
-    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
-        self.upid.fmt(f)
-    }
-}
-
-struct WorkerTaskData {
-    logger: FileLogger,
-    progress: f64, // 0..1
-    warn_count: u64,
-    pub abort_listeners: Vec<oneshot::Sender<()>>,
-}
-
-impl WorkerTask {
-
-    pub fn new(
-        worker_type: &str,
-        worker_id: Option<String>,
-        auth_id: String,
-        to_stdout: bool,
-    ) -> Result<Arc<Self>, Error> {
-
-        let setup = worker_task_setup()?;
-
-        let upid = UPID::new(worker_type, worker_id, auth_id)?;
-        let task_id = upid.task_id;
-
-        let mut path = setup.taskdir.clone();
-
-        path.push(format!("{:02X}", upid.pstart & 255));
-
-        let dir_opts = setup.file_opts.clone()
-            .perm(nix::sys::stat::Mode::from_bits_truncate(0o755));
-
-        create_path(&path, None, Some(dir_opts))?;
-
-        path.push(upid.to_string());
-
-        let logger_options = FileLogOptions {
-            to_stdout,
-            exclusive: true,
-            prefix_time: true,
-            read: true,
-            file_opts: setup.file_opts.clone(),
-            ..Default::default()
-        };
-        let logger = FileLogger::new(&path, logger_options)?;
-
-        let worker = Arc::new(Self {
-            setup,
-            upid: upid.clone(),
-            abort_requested: AtomicBool::new(false),
-            data: Mutex::new(WorkerTaskData {
-                logger,
-                progress: 0.0,
-                warn_count: 0,
-                abort_listeners: vec![],
-            }),
-        });
-
-        // scope to drop the lock again after inserting
-        {
-            let mut hash = WORKER_TASK_LIST.lock().unwrap();
-            hash.insert(task_id, worker.clone());
-            proxmox_rest_server::set_worker_count(hash.len());
-        }
-
-        setup.update_active_workers(Some(&upid))?;
-
-        Ok(worker)
-    }
-
-    /// Spawn a new tokio task/future.
-    pub fn spawn<F, T>(
-        worker_type: &str,
-        worker_id: Option<String>,
-        auth_id: String,
-        to_stdout: bool,
-        f: F,
-    ) -> Result<String, Error>
-        where F: Send + 'static + FnOnce(Arc<WorkerTask>) -> T,
-              T: Send + 'static + Future<Output = Result<(), Error>>,
-    {
-        let worker = WorkerTask::new(worker_type, worker_id, auth_id, to_stdout)?;
-        let upid_str = worker.upid.to_string();
-        let f = f(worker.clone());
-        tokio::spawn(async move {
-            let result = f.await;
-            worker.log_result(&result);
-        });
-
-        Ok(upid_str)
-    }
-
-    /// Create a new worker thread.
-    pub fn new_thread<F>(
-        worker_type: &str,
-        worker_id: Option<String>,
-        auth_id: String,
-        to_stdout: bool,
-        f: F,
-    ) -> Result<String, Error>
-        where F: Send + UnwindSafe + 'static + FnOnce(Arc<WorkerTask>) -> Result<(), Error>
-    {
-        let worker = WorkerTask::new(worker_type, worker_id, auth_id, to_stdout)?;
-        let upid_str = worker.upid.to_string();
-
-        let _child = std::thread::Builder::new().name(upid_str.clone()).spawn(move || {
-            let worker1 = worker.clone();
-            let result = match std::panic::catch_unwind(move || f(worker1)) {
-                Ok(r) => r,
-                Err(panic) => {
-                    match panic.downcast::<&str>() {
-                        Ok(panic_msg) => {
-                            Err(format_err!("worker panicked: {}", panic_msg))
-                        }
-                        Err(_) => {
-                            Err(format_err!("worker panicked: unknown type."))
-                        }
-                    }
-                }
-            };
-
-            worker.log_result(&result);
-        });
-
-        Ok(upid_str)
-    }
-
-    /// create state from self and a result
-    pub fn create_state(&self, result: &Result<(), Error>) -> TaskState {
-        let warn_count = self.data.lock().unwrap().warn_count;
-
-        let endtime = proxmox::tools::time::epoch_i64();
-
-        if let Err(err) = result {
-            TaskState::Error { message: err.to_string(), endtime }
-        } else if warn_count > 0 {
-            TaskState::Warning { count: warn_count, endtime }
-        } else {
-            TaskState::OK { endtime }
-        }
-    }
-
-    /// Log task result, remove task from running list
-    pub fn log_result(&self, result: &Result<(), Error>) {
-        let state = self.create_state(result);
-        self.log(state.result_text());
-
-        WORKER_TASK_LIST.lock().unwrap().remove(&self.upid.task_id);
-        let _ = self.setup.update_active_workers(None);
-        proxmox_rest_server::set_worker_count(WORKER_TASK_LIST.lock().unwrap().len());
-    }
-
-    /// Log a message.
-    pub fn log<S: AsRef<str>>(&self, msg: S) {
-        let mut data = self.data.lock().unwrap();
-        data.logger.log(msg);
-    }
-
-    /// Log a message as warning.
-    pub fn warn<S: AsRef<str>>(&self, msg: S) {
-        let mut data = self.data.lock().unwrap();
-        data.logger.log(format!("WARN: {}", msg.as_ref()));
-        data.warn_count += 1;
-    }
-
-    /// Set progress indicator
-    pub fn progress(&self, progress: f64) {
-        if progress >= 0.0 && progress <= 1.0 {
-            let mut data = self.data.lock().unwrap();
-            data.progress = progress;
-        } else {
-           // fixme:  log!("task '{}': ignoring strange value for progress '{}'", self.upid, progress);
-        }
-    }
-
-    /// Request abort
-    pub fn request_abort(&self) {
-        eprintln!("set abort flag for worker {}", self.upid);
-
-        let prev_abort = self.abort_requested.swap(true, Ordering::SeqCst);
-        if !prev_abort { // log abort one time
-            self.log(format!("received abort request ..."));
-        }
-        // noitify listeners
-        let mut data = self.data.lock().unwrap();
-        loop {
-            match data.abort_listeners.pop() {
-                None => { break; },
-                Some(ch) => {
-                    let _ = ch.send(()); // ignore errors here
-                },
-            }
-        }
-    }
-
-    /// Test if abort was requested.
-    pub fn abort_requested(&self) -> bool {
-        self.abort_requested.load(Ordering::SeqCst)
-    }
-
-    /// Fail if abort was requested.
-    pub fn fail_on_abort(&self) -> Result<(), Error> {
-        if self.abort_requested() {
-            bail!("abort requested - aborting task");
-        }
-        Ok(())
-    }
-
-    /// Get a future which resolves on task abort
-    pub fn abort_future(&self) ->  oneshot::Receiver<()> {
-        let (tx, rx) = oneshot::channel::<()>();
-
-        let mut data = self.data.lock().unwrap();
-        if self.abort_requested() {
-            let _ = tx.send(());
-        } else {
-            data.abort_listeners.push(tx);
-        }
-        rx
-    }
-
-    pub fn upid(&self) -> &UPID {
-        &self.upid
-    }
-}
-
-impl pbs_datastore::task::TaskState for WorkerTask {
-    fn check_abort(&self) -> Result<(), Error> {
-        self.fail_on_abort()
-    }
-
-    fn log(&self, level: log::Level, message: &std::fmt::Arguments) {
-        match level {
-            log::Level::Error => self.warn(&message.to_string()),
-            log::Level::Warn => self.warn(&message.to_string()),
-            log::Level::Info => self.log(&message.to_string()),
-            log::Level::Debug => self.log(&format!("DEBUG: {}", message)),
-            log::Level::Trace => self.log(&format!("TRACE: {}", message)),
-        }
-    }
-}
-
-/// Wait for a locally spanned worker task
-///
-/// Note: local workers should print logs to stdout, so there is no
-/// need to fetch/display logs. We just wait for the worker to finish.
-pub async fn wait_for_local_worker(upid_str: &str) -> Result<(), Error> {
-
-    let upid: UPID = upid_str.parse()?;
-
-    let sleep_duration = core::time::Duration::new(0, 100_000_000);
-
-    loop {
-        if worker_is_active_local(&upid) {
-            tokio::time::sleep(sleep_duration).await;
-        } else {
-            break;
-        }
-    }
-    Ok(())
-}
-
-/// Request abort of a local worker (if existing and running)
-pub fn abort_local_worker(upid: UPID) {
-    if let Some(ref worker) = WORKER_TASK_LIST.lock().unwrap().get(&upid.task_id) {
-        worker.request_abort();
-    }
-}
index ef5ffdbf230b9f44ea72a1cf5ec9e443a3fab166..e8e60d1978cc5b7c69e6bbaebb61671bbaf47e17 100644 (file)
@@ -30,19 +30,16 @@ use proxmox::{
 
 use pbs_api_types::{VirtualTapeDrive, LtoTapeDrive, Fingerprint};
 use pbs_config::key_config::KeyConfig;
-use pbs_datastore::task::TaskState;
-use pbs_datastore::task_log;
+use pbs_tools::{task_log, task::TaskState};
 
 use pbs_tape::{
     TapeWrite, TapeRead, BlockReadError, MediaContentHeader,
     sg_tape::TapeAlertFlags,
 };
+use proxmox_rest_server::WorkerTask;
 
 use crate::{
-    server::{
-        send_load_media_email,
-        WorkerTask,
-    },
+    server::send_load_media_email,
     tape::{
         MediaId,
         drive::{
index 8042de9e22fc78a5d7dd3353d6301115e9c95607..2984173f8cafc7786aff4ad073848af793c6cbe7 100644 (file)
@@ -13,16 +13,16 @@ use anyhow::{bail, Error};
 
 use proxmox::tools::Uuid;
 
-use pbs_datastore::task_log;
+use pbs_tools::task_log;
 use pbs_config::tape_encryption_keys::load_key_configs;
 use pbs_tape::{
     TapeWrite,
     sg_tape::tape_alert_flags_critical,
 };
+use proxmox_rest_server::WorkerTask;
 
 use crate::{
     backup::{DataStore, SnapshotReader},
-    server::WorkerTask,
     tape::{
         TAPE_STATUS_DIR,
         MAX_CHUNK_ARCHIVE_SIZE,
index 7271ea558a23fa7e9039d1aa093e2fb1c57325fb..08b8fa9e56a88e2e42c2b806049a250f81f55ead 100644 (file)
@@ -6,13 +6,13 @@ extern crate tokio;
 extern crate nix;
 
 use proxmox::try_block;
+use proxmox::tools::fs::CreateOptions;
 
 use pbs_api_types::{Authid, UPID};
 
-use proxmox_rest_server::{flog, CommandoSocket};
-use proxmox_backup::server;
+use proxmox_rest_server::{flog, CommandoSocket, WorkerTask};
 
-fn garbage_collection(worker: &server::WorkerTask) -> Result<(), Error> {
+fn garbage_collection(worker: &WorkerTask) -> Result<(), Error> {
 
     worker.log("start garbage collection");
 
@@ -33,9 +33,12 @@ fn garbage_collection(worker: &server::WorkerTask) -> Result<(), Error> {
 #[test]
 #[ignore]
 fn worker_task_abort() -> Result<(), Error> {
-
-    server::create_task_log_dirs()?;
-
+    let uid = nix::unistd::Uid::current();
+    let gid = nix::unistd::Gid::current();
+        
+    let file_opts = CreateOptions::new().owner(uid).group(gid);
+    proxmox_rest_server::init_worker_tasks("./target/tasklogtestdir".into(), file_opts.clone())?;
     use std::sync::{Arc, Mutex};
 
     let errmsg: Arc<Mutex<Option<String>>> = Arc::new(Mutex::new(None));
@@ -44,10 +47,11 @@ fn worker_task_abort() -> Result<(), Error> {
     let rt = tokio::runtime::Runtime::new().unwrap();
     rt.block_on(async move {
 
-        let mut commando_sock = CommandoSocket::new(server::our_ctrl_sock(), nix::unistd::Gid::current());
+        let mut commando_sock = CommandoSocket::new(
+            proxmox_rest_server::our_ctrl_sock(), nix::unistd::Gid::current());
 
         let init_result: Result<(), Error> = try_block!({
-            server::register_task_control_commands(&mut commando_sock)?;
+            proxmox_rest_server::register_task_control_commands(&mut commando_sock)?;
             proxmox_rest_server::server_state_init()?;
             Ok(())
         });
@@ -63,10 +67,10 @@ fn worker_task_abort() -> Result<(), Error> {
         }
 
         let errmsg = errmsg1.clone();
-        let res = server::WorkerTask::new_thread(
+        let res = WorkerTask::new_thread(
             "garbage_collection",
             None,
-            Authid::root_auth_id().clone(),
+            Authid::root_auth_id().to_string(),
             true,
             move |worker| {
                 println!("WORKER {}", worker);
@@ -91,8 +95,8 @@ fn worker_task_abort() -> Result<(), Error> {
             }
             Ok(wid) => {
                 println!("WORKER: {}", wid);
-                server::abort_worker_async(wid.parse::<UPID>().unwrap());
-                server::wait_for_local_worker(&wid).await.unwrap();
+                proxmox_rest_server::abort_worker_async(wid.parse::<UPID>().unwrap());
+                proxmox_rest_server::wait_for_local_worker(&wid).await.unwrap();
              }
         }
     });