Also moved pbs-datastore/src/task.rs to pbs-tools, which now depends on 'log'.
use pbs_api_types::GarbageCollectionStatus;
use pbs_tools::process_locker::{self, ProcessLocker};
+use pbs_tools::{task_log, task::TaskState};
use crate::DataBlob;
-use crate::task_log;
-use crate::task::TaskState;
/// File system based chunk store
pub struct ChunkStore {
for (entry, percentage, bad) in self.get_chunk_iterator()? {
if last_percentage != percentage {
last_percentage = percentage;
- crate::task_log!(
+ task_log!(
worker,
"processed {}% ({} chunks)",
percentage,
pub mod prune;
pub mod read_chunk;
pub mod store_progress;
-pub mod task;
pub mod dynamic_index;
pub mod fixed_index;
+++ /dev/null
-use anyhow::Error;
-
-/// `WorkerTask` methods commonly used from contexts otherwise not related to the API server.
-pub trait TaskState {
- /// If the task should be aborted, this should fail with a reasonable error message.
- fn check_abort(&self) -> Result<(), Error>;
-
- /// Create a log message for this task.
- fn log(&self, level: log::Level, message: &std::fmt::Arguments);
-}
-
-/// Convenience implementation:
-impl<T: TaskState + ?Sized> TaskState for std::sync::Arc<T> {
- fn check_abort(&self) -> Result<(), Error> {
- <T as TaskState>::check_abort(&*self)
- }
-
- fn log(&self, level: log::Level, message: &std::fmt::Arguments) {
- <T as TaskState>::log(&*self, level, message)
- }
-}
-
-#[macro_export]
-macro_rules! task_error {
- ($task:expr, $($fmt:tt)+) => {{
- $crate::task::TaskState::log(&*$task, log::Level::Error, &format_args!($($fmt)+))
- }};
-}
-
-#[macro_export]
-macro_rules! task_warn {
- ($task:expr, $($fmt:tt)+) => {{
- $crate::task::TaskState::log(&*$task, log::Level::Warn, &format_args!($($fmt)+))
- }};
-}
-
-#[macro_export]
-macro_rules! task_log {
- ($task:expr, $($fmt:tt)+) => {{
- $crate::task::TaskState::log(&*$task, log::Level::Info, &format_args!($($fmt)+))
- }};
-}
-
-#[macro_export]
-macro_rules! task_debug {
- ($task:expr, $($fmt:tt)+) => {{
- $crate::task::TaskState::log(&*$task, log::Level::Debug, &format_args!($($fmt)+))
- }};
-}
-
-#[macro_export]
-macro_rules! task_trace {
- ($task:expr, $($fmt:tt)+) => {{
- $crate::task::TaskState::log(&*$task, log::Level::Trace, &format_args!($($fmt)+))
- }};
-}
futures = "0.3"
lazy_static = "1.4"
libc = "0.2"
+log = "0.4"
nix = "0.19.1"
nom = "5.1"
openssl = "0.10"
pub mod stream;
pub mod sync;
pub mod sys;
+pub mod task;
pub mod ticket;
pub mod tokio;
pub mod xattr;
--- /dev/null
+use anyhow::Error;
+
+/// `WorkerTask` methods commonly used from contexts otherwise not related to the API server.
+pub trait TaskState {
+ /// If the task should be aborted, this should fail with a reasonable error message.
+ fn check_abort(&self) -> Result<(), Error>;
+
+ /// Create a log message for this task.
+ fn log(&self, level: log::Level, message: &std::fmt::Arguments);
+}
+
+/// Convenience implementation:
+impl<T: TaskState + ?Sized> TaskState for std::sync::Arc<T> {
+ fn check_abort(&self) -> Result<(), Error> {
+ <T as TaskState>::check_abort(&*self)
+ }
+
+ fn log(&self, level: log::Level, message: &std::fmt::Arguments) {
+ <T as TaskState>::log(&*self, level, message)
+ }
+}
+
+#[macro_export]
+macro_rules! task_error {
+ ($task:expr, $($fmt:tt)+) => {{
+ $crate::task::TaskState::log(&*$task, log::Level::Error, &format_args!($($fmt)+))
+ }};
+}
+
+#[macro_export]
+macro_rules! task_warn {
+ ($task:expr, $($fmt:tt)+) => {{
+ $crate::task::TaskState::log(&*$task, log::Level::Warn, &format_args!($($fmt)+))
+ }};
+}
+
+#[macro_export]
+macro_rules! task_log {
+ ($task:expr, $($fmt:tt)+) => {{
+ $crate::task::TaskState::log(&*$task, log::Level::Info, &format_args!($($fmt)+))
+ }};
+}
+
+#[macro_export]
+macro_rules! task_debug {
+ ($task:expr, $($fmt:tt)+) => {{
+ $crate::task::TaskState::log(&*$task, log::Level::Debug, &format_args!($($fmt)+))
+ }};
+}
+
+#[macro_export]
+macro_rules! task_trace {
+ ($task:expr, $($fmt:tt)+) => {{
+ $crate::task::TaskState::log(&*$task, log::Level::Trace, &format_args!($($fmt)+))
+ }};
+}
libc = "0.2"
log = "0.4"
nix = "0.19.1"
+once_cell = "1.3.1"
percent-encoding = "2.1"
regex = "1.2"
serde = { version = "1.0", features = [] }
use std::os::unix::io::RawFd;
use anyhow::{bail, format_err, Error};
+use nix::unistd::Pid;
use proxmox::tools::fd::Fd;
+use proxmox::sys::linux::procfs::PidStat;
use proxmox::api::UserInformation;
+use proxmox::tools::fs::CreateOptions;
mod compression;
pub use compression::*;
mod rest;
pub use rest::{RestServer, handle_api_request};
+mod worker_task;
+pub use worker_task::*;
+
pub enum AuthError {
Generic(Error),
NoData,
) -> Result<(String, Box<dyn UserInformation + Sync + Send>), AuthError>;
}
+lazy_static::lazy_static!{
+ static ref PID: i32 = unsafe { libc::getpid() };
+ static ref PSTART: u64 = PidStat::read_from_pid(Pid::from_raw(*PID)).unwrap().starttime;
+}
+
+pub fn pid() -> i32 {
+ *PID
+}
+
+pub fn pstart() -> u64 {
+ *PSTART
+}
+
+pub fn write_pid(pid_fn: &str) -> Result<(), Error> {
+ let pid_str = format!("{}\n", *PID);
+ proxmox::tools::fs::replace_file(pid_fn, pid_str.as_bytes(), CreateOptions::new())
+}
+
+pub fn read_pid(pid_fn: &str) -> Result<i32, Error> {
+ let pid = proxmox::tools::fs::file_get_contents(pid_fn)?;
+ let pid = std::str::from_utf8(&pid)?.trim();
+ pid.parse().map_err(|err| format_err!("could not parse pid - {}", err))
+}
+
+pub fn ctrl_sock_from_pid(pid: i32) -> String {
+ // Note: The control socket always uses @/run/proxmox-backup/ as prefix
+ // for historc reason.
+ format!("\0{}/control-{}.sock", "/run/proxmox-backup", pid)
+}
+
+pub fn our_ctrl_sock() -> String {
+ ctrl_sock_from_pid(*PID)
+}
+
static mut SHUTDOWN_REQUESTED: bool = false;
pub fn request_shutdown() {
--- /dev/null
+use std::collections::{HashMap, VecDeque};
+use std::fs::File;
+use std::path::PathBuf;
+use std::io::{Read, Write, BufRead, BufReader};
+use std::panic::UnwindSafe;
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::sync::{Arc, Mutex};
+
+use anyhow::{bail, format_err, Error};
+use futures::*;
+use lazy_static::lazy_static;
+use serde_json::{json, Value};
+use serde::{Serialize, Deserialize};
+use tokio::sync::oneshot;
+use nix::fcntl::OFlag;
+use once_cell::sync::OnceCell;
+
+use proxmox::sys::linux::procfs;
+use proxmox::try_block;
+use proxmox::tools::fs::{create_path, replace_file, atomic_open_or_create_file, CreateOptions};
+use proxmox::api::upid::UPID;
+
+use pbs_tools::logrotate::{LogRotate, LogRotateFiles};
+
+use crate::{CommandoSocket, FileLogger, FileLogOptions};
+
+struct TaskListLockGuard(File);
+
+struct WorkerTaskSetup {
+ file_opts: CreateOptions,
+ taskdir: PathBuf,
+ task_lock_fn: PathBuf,
+ active_tasks_fn: PathBuf,
+ task_index_fn: PathBuf,
+ task_archive_fn: PathBuf,
+}
+
+static WORKER_TASK_SETUP: OnceCell<WorkerTaskSetup> = OnceCell::new();
+
+fn worker_task_setup() -> Result<&'static WorkerTaskSetup, Error> {
+ WORKER_TASK_SETUP.get()
+ .ok_or_else(|| format_err!("WorkerTask library is not initialized"))
+}
+
+impl WorkerTaskSetup {
+
+ fn new(basedir: PathBuf, file_opts: CreateOptions) -> Self {
+
+ let mut taskdir = basedir.clone();
+ taskdir.push("tasks");
+
+ let mut task_lock_fn = taskdir.clone();
+ task_lock_fn.push(".active.lock");
+
+ let mut active_tasks_fn = taskdir.clone();
+ active_tasks_fn.push("active");
+
+ let mut task_index_fn = taskdir.clone();
+ task_index_fn.push("index");
+
+ let mut task_archive_fn = taskdir.clone();
+ task_archive_fn.push("archive");
+
+ Self {
+ file_opts,
+ taskdir,
+ task_lock_fn,
+ active_tasks_fn,
+ task_index_fn,
+ task_archive_fn,
+ }
+ }
+
+ fn lock_task_list_files(&self, exclusive: bool) -> Result<TaskListLockGuard, Error> {
+ let options = self.file_opts.clone()
+ .perm(nix::sys::stat::Mode::from_bits_truncate(0o660));
+
+ let timeout = std::time::Duration::new(10, 0);
+
+ let file = proxmox::tools::fs::open_file_locked(
+ &self.task_lock_fn,
+ timeout,
+ exclusive,
+ options,
+ )?;
+
+ Ok(TaskListLockGuard(file))
+ }
+
+ fn log_path(&self, upid: &UPID) -> std::path::PathBuf {
+ let mut path = self.taskdir.clone();
+ path.push(format!("{:02X}", upid.pstart % 256));
+ path.push(upid.to_string());
+ path
+ }
+
+ // atomically read/update the task list, update status of finished tasks
+ // new_upid is added to the list when specified.
+ fn update_active_workers(&self, new_upid: Option<&UPID>) -> Result<(), Error> {
+
+ let lock = self.lock_task_list_files(true)?;
+
+ // TODO remove with 1.x
+ let mut finish_list: Vec<TaskListInfo> = read_task_file_from_path(&self.task_index_fn)?;
+ let had_index_file = !finish_list.is_empty();
+
+ // We use filter_map because one negative case wants to *move* the data into `finish_list`,
+ // clippy doesn't quite catch this!
+ #[allow(clippy::unnecessary_filter_map)]
+ let mut active_list: Vec<TaskListInfo> = read_task_file_from_path(&self.active_tasks_fn)?
+ .into_iter()
+ .filter_map(|info| {
+ if info.state.is_some() {
+ // this can happen when the active file still includes finished tasks
+ finish_list.push(info);
+ return None;
+ }
+
+ if !worker_is_active_local(&info.upid) {
+ // println!("Detected stopped task '{}'", &info.upid_str);
+ let now = proxmox::tools::time::epoch_i64();
+ let status = upid_read_status(&info.upid).unwrap_or(TaskState::Unknown { endtime: now });
+ finish_list.push(TaskListInfo {
+ upid: info.upid,
+ upid_str: info.upid_str,
+ state: Some(status)
+ });
+ return None;
+ }
+
+ Some(info)
+ }).collect();
+
+ if let Some(upid) = new_upid {
+ active_list.push(TaskListInfo { upid: upid.clone(), upid_str: upid.to_string(), state: None });
+ }
+
+ let active_raw = render_task_list(&active_list);
+
+ let options = self.file_opts.clone()
+ .perm(nix::sys::stat::Mode::from_bits_truncate(0o660));
+
+ replace_file(
+ &self.active_tasks_fn,
+ active_raw.as_bytes(),
+ options,
+ )?;
+
+ finish_list.sort_unstable_by(|a, b| {
+ match (&a.state, &b.state) {
+ (Some(s1), Some(s2)) => s1.cmp(&s2),
+ (Some(_), None) => std::cmp::Ordering::Less,
+ (None, Some(_)) => std::cmp::Ordering::Greater,
+ _ => a.upid.starttime.cmp(&b.upid.starttime),
+ }
+ });
+
+ if !finish_list.is_empty() {
+ let options = self.file_opts.clone()
+ .perm(nix::sys::stat::Mode::from_bits_truncate(0o660));
+
+ let mut writer = atomic_open_or_create_file(
+ &self.task_archive_fn,
+ OFlag::O_APPEND | OFlag::O_RDWR,
+ &[],
+ options,
+ )?;
+ for info in &finish_list {
+ writer.write_all(render_task_line(&info).as_bytes())?;
+ }
+ }
+
+ // TODO Remove with 1.x
+ // for compatibility, if we had an INDEX file, we do not need it anymore
+ if had_index_file {
+ let _ = nix::unistd::unlink(&self.task_index_fn);
+ }
+
+ drop(lock);
+
+ Ok(())
+ }
+
+ // Create task log directory with correct permissions
+ fn create_task_log_dirs(&self) -> Result<(), Error> {
+
+ try_block!({
+ let dir_opts = self.file_opts.clone()
+ .perm(nix::sys::stat::Mode::from_bits_truncate(0o755));
+
+ create_path(&self.taskdir, Some(dir_opts.clone()), Some(dir_opts.clone()))?;
+ // fixme:??? create_path(pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR, None, Some(opts))?;
+ Ok(())
+ }).map_err(|err: Error| format_err!("unable to create task log dir - {}", err))
+ }
+}
+
+/// Initialize the WorkerTask library
+pub fn init_worker_tasks(basedir: PathBuf, file_opts: CreateOptions) -> Result<(), Error> {
+ let setup = WorkerTaskSetup::new(basedir, file_opts);
+ setup.create_task_log_dirs()?;
+ WORKER_TASK_SETUP.set(setup)
+ .map_err(|_| format_err!("init_worker_tasks failed - already initialized"))
+}
+
+/// checks if the Task Archive is bigger that 'size_threshold' bytes, and
+/// rotates it if it is
+pub fn rotate_task_log_archive(size_threshold: u64, compress: bool, max_files: Option<usize>) -> Result<bool, Error> {
+
+ let setup = worker_task_setup()?;
+
+ let _lock = setup.lock_task_list_files(true)?;
+
+ let mut logrotate = LogRotate::new(&setup.task_archive_fn, compress)
+ .ok_or_else(|| format_err!("could not get archive file names"))?;
+
+ logrotate.rotate(size_threshold, None, max_files)
+}
+
+
+/// Path to the worker log file
+pub fn upid_log_path(upid: &UPID) -> Result<std::path::PathBuf, Error> {
+ let setup = worker_task_setup()?;
+ Ok(setup.log_path(upid))
+}
+
+/// Read endtime (time of last log line) and exitstatus from task log file
+/// If there is not a single line with at valid datetime, we assume the
+/// starttime to be the endtime
+pub fn upid_read_status(upid: &UPID) -> Result<TaskState, Error> {
+
+ let setup = worker_task_setup()?;
+
+ let mut status = TaskState::Unknown { endtime: upid.starttime };
+
+ let path = setup.log_path(upid);
+
+ let mut file = File::open(path)?;
+
+ /// speedup - only read tail
+ use std::io::Seek;
+ use std::io::SeekFrom;
+ let _ = file.seek(SeekFrom::End(-8192)); // ignore errors
+
+ let mut data = Vec::with_capacity(8192);
+ file.read_to_end(&mut data)?;
+
+ // strip newlines at the end of the task logs
+ while data.last() == Some(&b'\n') {
+ data.pop();
+ }
+
+ let last_line = match data.iter().rposition(|c| *c == b'\n') {
+ Some(start) if data.len() > (start+1) => &data[start+1..],
+ Some(_) => &data, // should not happen, since we removed all trailing newlines
+ None => &data,
+ };
+
+ let last_line = std::str::from_utf8(last_line)
+ .map_err(|err| format_err!("upid_read_status: utf8 parse failed: {}", err))?;
+
+ let mut iter = last_line.splitn(2, ": ");
+ if let Some(time_str) = iter.next() {
+ if let Ok(endtime) = proxmox::tools::time::parse_rfc3339(time_str) {
+ // set the endtime even if we cannot parse the state
+ status = TaskState::Unknown { endtime };
+ if let Some(rest) = iter.next().and_then(|rest| rest.strip_prefix("TASK ")) {
+ if let Ok(state) = TaskState::from_endtime_and_message(endtime, rest) {
+ status = state;
+ }
+ }
+ }
+ }
+
+ Ok(status)
+}
+
+lazy_static! {
+ static ref WORKER_TASK_LIST: Mutex<HashMap<usize, Arc<WorkerTask>>> = Mutex::new(HashMap::new());
+}
+
+/// checks if the task UPID refers to a worker from this process
+fn is_local_worker(upid: &UPID) -> bool {
+ upid.pid == crate::pid() && upid.pstart == crate::pstart()
+}
+
+/// Test if the task is still running
+pub async fn worker_is_active(upid: &UPID) -> Result<bool, Error> {
+ if is_local_worker(upid) {
+ return Ok(WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id));
+ }
+
+ if procfs::check_process_running_pstart(upid.pid, upid.pstart).is_none() {
+ return Ok(false);
+ }
+
+ let sock = crate::ctrl_sock_from_pid(upid.pid);
+ let cmd = json!({
+ "command": "worker-task-status",
+ "args": {
+ "upid": upid.to_string(),
+ },
+ });
+ let status = crate::send_command(sock, &cmd).await?;
+
+ if let Some(active) = status.as_bool() {
+ Ok(active)
+ } else {
+ bail!("got unexpected result {:?} (expected bool)", status);
+ }
+}
+
+/// Test if the task is still running (fast but inaccurate implementation)
+///
+/// If the task is spawned from a different process, we simply return if
+/// that process is still running. This information is good enough to detect
+/// stale tasks...
+pub fn worker_is_active_local(upid: &UPID) -> bool {
+ if is_local_worker(upid) {
+ WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id)
+ } else {
+ procfs::check_process_running_pstart(upid.pid, upid.pstart).is_some()
+ }
+}
+
+pub fn register_task_control_commands(
+ commando_sock: &mut CommandoSocket,
+) -> Result<(), Error> {
+ fn get_upid(args: Option<&Value>) -> Result<UPID, Error> {
+ let args = if let Some(args) = args { args } else { bail!("missing args") };
+ let upid = match args.get("upid") {
+ Some(Value::String(upid)) => upid.parse::<UPID>()?,
+ None => bail!("no upid in args"),
+ _ => bail!("unable to parse upid"),
+ };
+ if !is_local_worker(&upid) {
+ bail!("upid does not belong to this process");
+ }
+ Ok(upid)
+ }
+
+ commando_sock.register_command("worker-task-abort".into(), move |args| {
+ let upid = get_upid(args)?;
+
+ abort_local_worker(upid);
+
+ Ok(Value::Null)
+ })?;
+ commando_sock.register_command("worker-task-status".into(), move |args| {
+ let upid = get_upid(args)?;
+
+ let active = WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id);
+
+ Ok(active.into())
+ })?;
+
+ Ok(())
+}
+
+pub fn abort_worker_async(upid: UPID) {
+ tokio::spawn(async move {
+ if let Err(err) = abort_worker(upid).await {
+ eprintln!("abort worker failed - {}", err);
+ }
+ });
+}
+
+pub async fn abort_worker(upid: UPID) -> Result<(), Error> {
+
+ let sock = crate::ctrl_sock_from_pid(upid.pid);
+ let cmd = json!({
+ "command": "worker-task-abort",
+ "args": {
+ "upid": upid.to_string(),
+ },
+ });
+ crate::send_command(sock, &cmd).map_ok(|_| ()).await
+}
+
+fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<TaskState>), Error> {
+
+ let data = line.splitn(3, ' ').collect::<Vec<&str>>();
+
+ let len = data.len();
+
+ match len {
+ 1 => Ok((data[0].to_owned(), data[0].parse::<UPID>()?, None)),
+ 3 => {
+ let endtime = i64::from_str_radix(data[1], 16)?;
+ let state = TaskState::from_endtime_and_message(endtime, data[2])?;
+ Ok((data[0].to_owned(), data[0].parse::<UPID>()?, Some(state)))
+ }
+ _ => bail!("wrong number of components"),
+ }
+}
+
+/// Task State
+#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
+pub enum TaskState {
+ /// The Task ended with an undefined state
+ Unknown { endtime: i64 },
+ /// The Task ended and there were no errors or warnings
+ OK { endtime: i64 },
+ /// The Task had 'count' amount of warnings and no errors
+ Warning { count: u64, endtime: i64 },
+ /// The Task ended with the error described in 'message'
+ Error { message: String, endtime: i64 },
+}
+
+impl TaskState {
+ pub fn endtime(&self) -> i64 {
+ match *self {
+ TaskState::Unknown { endtime } => endtime,
+ TaskState::OK { endtime } => endtime,
+ TaskState::Warning { endtime, .. } => endtime,
+ TaskState::Error { endtime, .. } => endtime,
+ }
+ }
+
+ fn result_text(&self) -> String {
+ match self {
+ TaskState::Error { message, .. } => format!("TASK ERROR: {}", message),
+ other => format!("TASK {}", other),
+ }
+ }
+
+ fn from_endtime_and_message(endtime: i64, s: &str) -> Result<Self, Error> {
+ if s == "unknown" {
+ Ok(TaskState::Unknown { endtime })
+ } else if s == "OK" {
+ Ok(TaskState::OK { endtime })
+ } else if let Some(warnings) = s.strip_prefix("WARNINGS: ") {
+ let count: u64 = warnings.parse()?;
+ Ok(TaskState::Warning{ count, endtime })
+ } else if !s.is_empty() {
+ let message = if let Some(err) = s.strip_prefix("ERROR: ") { err } else { s }.to_string();
+ Ok(TaskState::Error{ message, endtime })
+ } else {
+ bail!("unable to parse Task Status '{}'", s);
+ }
+ }
+}
+
+impl std::cmp::PartialOrd for TaskState {
+ fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
+ Some(self.endtime().cmp(&other.endtime()))
+ }
+}
+
+impl std::cmp::Ord for TaskState {
+ fn cmp(&self, other: &Self) -> std::cmp::Ordering {
+ self.endtime().cmp(&other.endtime())
+ }
+}
+
+impl std::fmt::Display for TaskState {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match self {
+ TaskState::Unknown { .. } => write!(f, "unknown"),
+ TaskState::OK { .. }=> write!(f, "OK"),
+ TaskState::Warning { count, .. } => write!(f, "WARNINGS: {}", count),
+ TaskState::Error { message, .. } => write!(f, "{}", message),
+ }
+ }
+}
+
+/// Task details including parsed UPID
+///
+/// If there is no `state`, the task is still running.
+#[derive(Debug)]
+pub struct TaskListInfo {
+ /// The parsed UPID
+ pub upid: UPID,
+ /// UPID string representation
+ pub upid_str: String,
+ /// Task `(endtime, status)` if already finished
+ pub state: Option<TaskState>, // endtime, status
+}
+
+fn render_task_line(info: &TaskListInfo) -> String {
+ let mut raw = String::new();
+ if let Some(status) = &info.state {
+ raw.push_str(&format!("{} {:08X} {}\n", info.upid_str, status.endtime(), status));
+ } else {
+ raw.push_str(&info.upid_str);
+ raw.push('\n');
+ }
+
+ raw
+}
+
+fn render_task_list(list: &[TaskListInfo]) -> String {
+ let mut raw = String::new();
+ for info in list {
+ raw.push_str(&render_task_line(&info));
+ }
+ raw
+}
+
+// note this is not locked, caller has to make sure it is
+// this will skip (and log) lines that are not valid status lines
+fn read_task_file<R: Read>(reader: R) -> Result<Vec<TaskListInfo>, Error>
+{
+ let reader = BufReader::new(reader);
+ let mut list = Vec::new();
+ for line in reader.lines() {
+ let line = line?;
+ match parse_worker_status_line(&line) {
+ Ok((upid_str, upid, state)) => list.push(TaskListInfo {
+ upid_str,
+ upid,
+ state
+ }),
+ Err(err) => {
+ eprintln!("unable to parse worker status '{}' - {}", line, err);
+ continue;
+ }
+ };
+ }
+
+ Ok(list)
+}
+
+// note this is not locked, caller has to make sure it is
+fn read_task_file_from_path<P>(path: P) -> Result<Vec<TaskListInfo>, Error>
+where
+ P: AsRef<std::path::Path> + std::fmt::Debug,
+{
+ let file = match File::open(&path) {
+ Ok(f) => f,
+ Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
+ Err(err) => bail!("unable to open task list {:?} - {}", path, err),
+ };
+
+ read_task_file(file)
+}
+
+pub struct TaskListInfoIterator {
+ list: VecDeque<TaskListInfo>,
+ end: bool,
+ archive: Option<LogRotateFiles>,
+ lock: Option<TaskListLockGuard>,
+}
+
+impl TaskListInfoIterator {
+ pub fn new(active_only: bool) -> Result<Self, Error> {
+
+ let setup = worker_task_setup()?;
+
+ let (read_lock, active_list) = {
+ let lock = setup.lock_task_list_files(false)?;
+ let active_list = read_task_file_from_path(&setup.active_tasks_fn)?;
+
+ let needs_update = active_list
+ .iter()
+ .any(|info| info.state.is_some() || !worker_is_active_local(&info.upid));
+
+ // TODO remove with 1.x
+ let index_exists = setup.task_index_fn.is_file();
+
+ if needs_update || index_exists {
+ drop(lock);
+ setup.update_active_workers(None)?;
+ let lock = setup.lock_task_list_files(false)?;
+ let active_list = read_task_file_from_path(&setup.active_tasks_fn)?;
+ (lock, active_list)
+ } else {
+ (lock, active_list)
+ }
+ };
+
+ let archive = if active_only {
+ None
+ } else {
+ let logrotate = LogRotate::new(&setup.task_archive_fn, true)
+ .ok_or_else(|| format_err!("could not get archive file names"))?;
+ Some(logrotate.files())
+ };
+
+ let lock = if active_only { None } else { Some(read_lock) };
+
+ Ok(Self {
+ list: active_list.into(),
+ end: active_only,
+ archive,
+ lock,
+ })
+ }
+}
+
+impl Iterator for TaskListInfoIterator {
+ type Item = Result<TaskListInfo, Error>;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ loop {
+ if let Some(element) = self.list.pop_back() {
+ return Some(Ok(element));
+ } else if self.end {
+ return None;
+ } else {
+ if let Some(mut archive) = self.archive.take() {
+ if let Some(file) = archive.next() {
+ let list = match read_task_file(file) {
+ Ok(list) => list,
+ Err(err) => return Some(Err(err)),
+ };
+ self.list.append(&mut list.into());
+ self.archive = Some(archive);
+ continue;
+ }
+ }
+
+ self.end = true;
+ self.lock.take();
+ }
+ }
+ }
+}
+
+/// Launch long running worker tasks.
+///
+/// A worker task can either be a whole thread, or a simply tokio
+/// task/future. Each task can `log()` messages, which are stored
+/// persistently to files. Task should poll the `abort_requested`
+/// flag, and stop execution when requested.
+pub struct WorkerTask {
+ setup: &'static WorkerTaskSetup,
+ upid: UPID,
+ data: Mutex<WorkerTaskData>,
+ abort_requested: AtomicBool,
+}
+
+impl std::fmt::Display for WorkerTask {
+
+ fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+ self.upid.fmt(f)
+ }
+}
+
+struct WorkerTaskData {
+ logger: FileLogger,
+ progress: f64, // 0..1
+ warn_count: u64,
+ pub abort_listeners: Vec<oneshot::Sender<()>>,
+}
+
+impl WorkerTask {
+
+ pub fn new(
+ worker_type: &str,
+ worker_id: Option<String>,
+ auth_id: String,
+ to_stdout: bool,
+ ) -> Result<Arc<Self>, Error> {
+
+ let setup = worker_task_setup()?;
+
+ let upid = UPID::new(worker_type, worker_id, auth_id)?;
+ let task_id = upid.task_id;
+
+ let mut path = setup.taskdir.clone();
+
+ path.push(format!("{:02X}", upid.pstart & 255));
+
+ let dir_opts = setup.file_opts.clone()
+ .perm(nix::sys::stat::Mode::from_bits_truncate(0o755));
+
+ create_path(&path, None, Some(dir_opts))?;
+
+ path.push(upid.to_string());
+
+ let logger_options = FileLogOptions {
+ to_stdout,
+ exclusive: true,
+ prefix_time: true,
+ read: true,
+ file_opts: setup.file_opts.clone(),
+ ..Default::default()
+ };
+ let logger = FileLogger::new(&path, logger_options)?;
+
+ let worker = Arc::new(Self {
+ setup,
+ upid: upid.clone(),
+ abort_requested: AtomicBool::new(false),
+ data: Mutex::new(WorkerTaskData {
+ logger,
+ progress: 0.0,
+ warn_count: 0,
+ abort_listeners: vec![],
+ }),
+ });
+
+ // scope to drop the lock again after inserting
+ {
+ let mut hash = WORKER_TASK_LIST.lock().unwrap();
+ hash.insert(task_id, worker.clone());
+ crate::set_worker_count(hash.len());
+ }
+
+ setup.update_active_workers(Some(&upid))?;
+
+ Ok(worker)
+ }
+
+ /// Spawn a new tokio task/future.
+ pub fn spawn<F, T>(
+ worker_type: &str,
+ worker_id: Option<String>,
+ auth_id: String,
+ to_stdout: bool,
+ f: F,
+ ) -> Result<String, Error>
+ where F: Send + 'static + FnOnce(Arc<WorkerTask>) -> T,
+ T: Send + 'static + Future<Output = Result<(), Error>>,
+ {
+ let worker = WorkerTask::new(worker_type, worker_id, auth_id, to_stdout)?;
+ let upid_str = worker.upid.to_string();
+ let f = f(worker.clone());
+ tokio::spawn(async move {
+ let result = f.await;
+ worker.log_result(&result);
+ });
+
+ Ok(upid_str)
+ }
+
+ /// Create a new worker thread.
+ pub fn new_thread<F>(
+ worker_type: &str,
+ worker_id: Option<String>,
+ auth_id: String,
+ to_stdout: bool,
+ f: F,
+ ) -> Result<String, Error>
+ where F: Send + UnwindSafe + 'static + FnOnce(Arc<WorkerTask>) -> Result<(), Error>
+ {
+ let worker = WorkerTask::new(worker_type, worker_id, auth_id, to_stdout)?;
+ let upid_str = worker.upid.to_string();
+
+ let _child = std::thread::Builder::new().name(upid_str.clone()).spawn(move || {
+ let worker1 = worker.clone();
+ let result = match std::panic::catch_unwind(move || f(worker1)) {
+ Ok(r) => r,
+ Err(panic) => {
+ match panic.downcast::<&str>() {
+ Ok(panic_msg) => {
+ Err(format_err!("worker panicked: {}", panic_msg))
+ }
+ Err(_) => {
+ Err(format_err!("worker panicked: unknown type."))
+ }
+ }
+ }
+ };
+
+ worker.log_result(&result);
+ });
+
+ Ok(upid_str)
+ }
+
+ /// create state from self and a result
+ pub fn create_state(&self, result: &Result<(), Error>) -> TaskState {
+ let warn_count = self.data.lock().unwrap().warn_count;
+
+ let endtime = proxmox::tools::time::epoch_i64();
+
+ if let Err(err) = result {
+ TaskState::Error { message: err.to_string(), endtime }
+ } else if warn_count > 0 {
+ TaskState::Warning { count: warn_count, endtime }
+ } else {
+ TaskState::OK { endtime }
+ }
+ }
+
+ /// Log task result, remove task from running list
+ pub fn log_result(&self, result: &Result<(), Error>) {
+ let state = self.create_state(result);
+ self.log(state.result_text());
+
+ WORKER_TASK_LIST.lock().unwrap().remove(&self.upid.task_id);
+ let _ = self.setup.update_active_workers(None);
+ crate::set_worker_count(WORKER_TASK_LIST.lock().unwrap().len());
+ }
+
+ /// Log a message.
+ pub fn log<S: AsRef<str>>(&self, msg: S) {
+ let mut data = self.data.lock().unwrap();
+ data.logger.log(msg);
+ }
+
+ /// Log a message as warning.
+ pub fn warn<S: AsRef<str>>(&self, msg: S) {
+ let mut data = self.data.lock().unwrap();
+ data.logger.log(format!("WARN: {}", msg.as_ref()));
+ data.warn_count += 1;
+ }
+
+ /// Set progress indicator
+ pub fn progress(&self, progress: f64) {
+ if progress >= 0.0 && progress <= 1.0 {
+ let mut data = self.data.lock().unwrap();
+ data.progress = progress;
+ } else {
+ // fixme: log!("task '{}': ignoring strange value for progress '{}'", self.upid, progress);
+ }
+ }
+
+ /// Request abort
+ pub fn request_abort(&self) {
+ eprintln!("set abort flag for worker {}", self.upid);
+
+ let prev_abort = self.abort_requested.swap(true, Ordering::SeqCst);
+ if !prev_abort { // log abort one time
+ self.log(format!("received abort request ..."));
+ }
+ // noitify listeners
+ let mut data = self.data.lock().unwrap();
+ loop {
+ match data.abort_listeners.pop() {
+ None => { break; },
+ Some(ch) => {
+ let _ = ch.send(()); // ignore errors here
+ },
+ }
+ }
+ }
+
+ /// Test if abort was requested.
+ pub fn abort_requested(&self) -> bool {
+ self.abort_requested.load(Ordering::SeqCst)
+ }
+
+ /// Fail if abort was requested.
+ pub fn fail_on_abort(&self) -> Result<(), Error> {
+ if self.abort_requested() {
+ bail!("abort requested - aborting task");
+ }
+ Ok(())
+ }
+
+ /// Get a future which resolves on task abort
+ pub fn abort_future(&self) -> oneshot::Receiver<()> {
+ let (tx, rx) = oneshot::channel::<()>();
+
+ let mut data = self.data.lock().unwrap();
+ if self.abort_requested() {
+ let _ = tx.send(());
+ } else {
+ data.abort_listeners.push(tx);
+ }
+ rx
+ }
+
+ pub fn upid(&self) -> &UPID {
+ &self.upid
+ }
+}
+
+impl pbs_tools::task::TaskState for WorkerTask {
+ fn check_abort(&self) -> Result<(), Error> {
+ self.fail_on_abort()
+ }
+
+ fn log(&self, level: log::Level, message: &std::fmt::Arguments) {
+ match level {
+ log::Level::Error => self.warn(&message.to_string()),
+ log::Level::Warn => self.warn(&message.to_string()),
+ log::Level::Info => self.log(&message.to_string()),
+ log::Level::Debug => self.log(&format!("DEBUG: {}", message)),
+ log::Level::Trace => self.log(&format!("TRACE: {}", message)),
+ }
+ }
+}
+
+/// Wait for a locally spanned worker task
+///
+/// Note: local workers should print logs to stdout, so there is no
+/// need to fetch/display logs. We just wait for the worker to finish.
+pub async fn wait_for_local_worker(upid_str: &str) -> Result<(), Error> {
+
+ let upid: UPID = upid_str.parse()?;
+
+ let sleep_duration = core::time::Duration::new(0, 100_000_000);
+
+ loop {
+ if worker_is_active_local(&upid) {
+ tokio::time::sleep(sleep_duration).await;
+ } else {
+ break;
+ }
+ }
+ Ok(())
+}
+
+/// Request abort of a local worker (if existing and running)
+pub fn abort_local_worker(upid: UPID) {
+ if let Some(ref worker) = WORKER_TASK_LIST.lock().unwrap().get(&upid.task_id) {
+ worker.request_abort();
+ }
+}
use crate::acme::AcmeClient;
use crate::api2::types::AcmeDomain;
-use crate::server::WorkerTask;
+use proxmox_rest_server::WorkerTask;
use crate::config::acme::plugin::{DnsPlugin, PluginData};
use pbs_tools::stream::{AsyncReaderStream, AsyncChannelWriter};
use pbs_tools::json::{required_integer_param, required_string_param};
use pbs_config::CachedUserInfo;
-use proxmox_rest_server::formatter;
+use proxmox_rest_server::{WorkerTask, formatter};
use crate::api2::node::rrd::create_value_from_rrd;
use crate::backup::{
DataStore, LocalChunkReader,
};
-use crate::server::{jobstate::Job, WorkerTask};
+use crate::server::jobstate::Job;
const GROUP_NOTES_FILE_NAME: &str = "notes";
use pbs_datastore::dynamic_index::DynamicIndexWriter;
use pbs_datastore::fixed_index::FixedIndexWriter;
use pbs_api_types::Authid;
-use proxmox_rest_server::formatter::*;
+use proxmox_rest_server::{WorkerTask, formatter::*};
use crate::backup::{verify_backup_dir_with_lock, DataStore};
-use crate::server::WorkerTask;
+
use hyper::{Body, Response};
#[derive(Copy, Clone, Serialize)]
use pbs_datastore::backup_info::{BackupDir, BackupGroup, BackupInfo};
use pbs_datastore::index::IndexFile;
use pbs_datastore::manifest::{archive_type, ArchiveType};
+use proxmox_rest_server::WorkerTask;
-use crate::server::{WorkerTask, H2Service};
+use crate::server::H2Service;
use crate::backup::DataStore;
use pbs_config::CachedUserInfo;
use crate::config::acme::plugin::{
self, DnsPlugin, DnsPluginCore, DnsPluginCoreUpdater, PLUGIN_ID_SCHEMA,
};
-use crate::server::WorkerTask;
+use proxmox_rest_server::WorkerTask;
pub(crate) const ROUTER: Router = Router::new()
.get(&list_subdirs_api_method!(SUBDIRS))
use proxmox::api::schema::{ApiType, parse_property_string};
use pbs_datastore::chunk_store::ChunkStore;
-use pbs_datastore::task::TaskState;
use pbs_config::BackupLockGuard;
use pbs_api_types::{
Authid, DatastoreNotify,
PRIV_DATASTORE_ALLOCATE, PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_MODIFY,
DataStoreConfig, DataStoreConfigUpdater,
};
+use pbs_tools::task::TaskState;
use crate::api2::config::sync::delete_sync_job;
use crate::api2::config::verify::delete_verification_job;
verify::list_verification_jobs,
};
use pbs_config::CachedUserInfo;
+use proxmox_rest_server::WorkerTask;
-use crate::server::{jobstate, WorkerTask};
+use crate::server::jobstate;
#[api(
input: {
};
use crate::config::node;
-use crate::server::WorkerTask;
+use proxmox_rest_server::WorkerTask;
use crate::tools::{
apt,
pbs_simple_http,
use crate::acme::AcmeClient;
use crate::api2::types::AcmeDomain;
use crate::config::node::NodeConfig;
-use crate::server::WorkerTask;
+use proxmox_rest_server::WorkerTask;
pub const ROUTER: Router = Router::new()
.get(&list_subdirs_api_method!(SUBDIRS))
};
use crate::tools::systemd::{self, types::*};
-use crate::server::WorkerTask;
+use proxmox_rest_server::WorkerTask;
const BASE_MOUNT_DIR: &str = "/mnt/datastore/";
DiskUsageInfo, DiskUsageType, DiskManage, SmartData,
get_disks, get_smart_data, get_disk_usage_info, inititialize_gpt_disk,
};
-use crate::server::WorkerTask;
+use proxmox_rest_server::WorkerTask;
pub mod directory;
pub mod zfs;
DiskUsageType,
};
-use crate::server::WorkerTask;
+use proxmox_rest_server::WorkerTask;
#[api(
use pbs_tools::auth::private_auth_key;
use pbs_tools::ticket::{self, Empty, Ticket};
-use crate::server::WorkerTask;
+use proxmox_rest_server::WorkerTask;
use crate::tools;
pub mod apt;
};
use pbs_config::network::{self, NetworkConfig};
-use crate::server::{WorkerTask};
+use proxmox_rest_server::WorkerTask;
fn split_interface_list(list: &str) -> Result<Vec<String>, Error> {
let value = parse_property_string(&list, &NETWORK_INTERFACE_ARRAY_SCHEMA)?;
use pbs_api_types::{Authid, NODE_SCHEMA, SERVICE_ID_SCHEMA, PRIV_SYS_AUDIT, PRIV_SYS_MODIFY};
-use crate::server::WorkerTask;
+use proxmox_rest_server::WorkerTask;
static SERVICE_NAME_LIST: [&str; 7] = [
"proxmox-backup",
};
use crate::api2::pull::check_pull_privs;
-use crate::server::{self, upid_log_path, upid_read_status, TaskState, TaskListInfoIterator};
+
+use proxmox_rest_server::{upid_log_path, upid_read_status, TaskState, TaskListInfoIterator};
use pbs_config::CachedUserInfo;
// matches respective job execution privileges
}
}
+fn into_task_list_item(info: proxmox_rest_server::TaskListInfo) -> pbs_api_types::TaskListItem {
+ let (endtime, status) = info
+ .state
+ .map_or_else(|| (None, None), |a| (Some(a.endtime()), Some(a.to_string())));
+
+ pbs_api_types::TaskListItem {
+ upid: info.upid_str,
+ node: "localhost".to_string(),
+ pid: info.upid.pid as i64,
+ pstart: info.upid.pstart,
+ starttime: info.upid.starttime,
+ worker_type: info.upid.worker_type,
+ worker_id: info.upid.worker_id,
+ user: info.upid.auth_id,
+ endtime,
+ status,
+ }
+}
+
#[api(
input: {
properties: {
result["tokenid"] = Value::from(task_auth_id.tokenname().unwrap().as_str());
}
- if crate::server::worker_is_active(&upid).await? {
+ if proxmox_rest_server::worker_is_active(&upid).await? {
result["status"] = Value::from("running");
} else {
let exitstatus = upid_read_status(&upid).unwrap_or(TaskState::Unknown { endtime: 0 });
rpcenv["total"] = Value::from(count);
if test_status {
- let active = crate::server::worker_is_active(&upid).await?;
+ let active = proxmox_rest_server::worker_is_active(&upid).await?;
rpcenv["active"] = Value::from(active);
}
user_info.check_privs(&auth_id, &["system", "tasks"], PRIV_SYS_MODIFY, false)?;
}
- server::abort_worker_async(upid);
+ proxmox_rest_server::abort_worker_async(upid);
Ok(Value::Null)
}
match (&info.state, &statusfilter) {
(Some(_), _) if running => continue,
- (Some(crate::server::TaskState::OK { .. }), _) if errors => continue,
+ (Some(TaskState::OK { .. }), _) if errors => continue,
(Some(state), Some(filters)) => {
if !filters.contains(&tasktype(state)) {
continue;
continue;
}
- result.push(info.into());
+ result.push(into_task_list_item(info));
if result.len() >= limit {
break;
DATASTORE_SCHEMA, REMOTE_ID_SCHEMA, REMOVE_VANISHED_BACKUPS_SCHEMA,
PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_PRUNE, PRIV_REMOTE_READ,
};
+use proxmox_rest_server::WorkerTask;
-use crate::server::{WorkerTask, jobstate::Job, pull::pull_store};
+use crate::server::{jobstate::Job, pull::pull_store};
use crate::backup::DataStore;
use pbs_config::CachedUserInfo;
use proxmox_rest_server::formatter::*;
use crate::backup::DataStore;
-use crate::server::WorkerTask;
+use proxmox_rest_server::WorkerTask;
//use proxmox::tools;
use pbs_datastore::index::IndexFile;
use pbs_datastore::manifest::{archive_type, ArchiveType};
use pbs_config::CachedUserInfo;
+use proxmox_rest_server::WorkerTask;
use crate::{
api2::helpers,
backup::DataStore,
server::{
- WorkerTask,
H2Service,
},
};
UPID_SCHEMA, JOB_ID_SCHEMA, PRIV_DATASTORE_READ, PRIV_TAPE_AUDIT, PRIV_TAPE_WRITE,
};
-use pbs_datastore::{task_log, task_warn, StoreProgress};
+use pbs_datastore::StoreProgress;
use pbs_datastore::backup_info::{BackupDir, BackupInfo};
-use pbs_datastore::task::TaskState;
+use pbs_tools::{task_log, task_warn, task::TaskState};
use pbs_config::CachedUserInfo;
+use proxmox_rest_server::WorkerTask;
use crate::{
server::{
},
},
backup::{DataStore, SnapshotReader},
- server::WorkerTask,
tape::{
TAPE_STATUS_DIR,
Inventory,
LtoDriveAndMediaStatus, Lp17VolumeStatistics,
};
-use pbs_datastore::task_log;
use pbs_api_types::{PRIV_TAPE_AUDIT, PRIV_TAPE_READ, PRIV_TAPE_WRITE};
use pbs_config::CachedUserInfo;
use pbs_tape::{
sg_tape::tape_alert_flags_critical,
linux_list_drives::{lto_tape_device_list, lookup_device_identification, open_lto_tape_device},
};
+use pbs_tools::task_log;
+use proxmox_rest_server::WorkerTask;
use crate::{
api2::tape::restore::{
fast_catalog_restore,
restore_media,
},
- server::WorkerTask,
tape::{
TAPE_STATUS_DIR,
Inventory,
UPID_SCHEMA, TAPE_RESTORE_SNAPSHOT_SCHEMA,
PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_MODIFY, PRIV_TAPE_READ,
};
-use pbs_datastore::{task_log, task_warn, DataBlob};
+use pbs_datastore::DataBlob;
use pbs_datastore::backup_info::BackupDir;
use pbs_datastore::dynamic_index::DynamicIndexReader;
use pbs_datastore::fixed_index::FixedIndexReader;
use pbs_datastore::index::IndexFile;
use pbs_datastore::manifest::{archive_type, ArchiveType, BackupManifest, MANIFEST_BLOB_NAME};
-use pbs_datastore::task::TaskState;
use pbs_config::CachedUserInfo;
use pbs_tape::{
TapeRead, BlockReadError, MediaContentHeader,
PROXMOX_BACKUP_CONTENT_HEADER_MAGIC_1_0,
};
+use pbs_tools::{task_log, task_warn, task::TaskState};
+use proxmox_rest_server::WorkerTask;
use crate::{
tools::ParallelHandler,
backup::DataStore,
- server::{
- lookup_user_email,
- WorkerTask,
- },
+ server::lookup_user_email,
tape::{
TAPE_STATUS_DIR,
MediaId,
use proxmox::tools::fs::{replace_file, file_read_optional_string, CreateOptions};
use pbs_api_types::{UPID, DataStoreConfig, Authid, GarbageCollectionStatus};
-use pbs_datastore::{task_log, task_warn};
use pbs_datastore::DataBlob;
use pbs_datastore::backup_info::{BackupGroup, BackupDir};
use pbs_datastore::chunk_store::ChunkStore;
ArchiveType, BackupManifest,
archive_type,
};
-use pbs_datastore::task::TaskState;
use pbs_tools::format::HumanByte;
use pbs_tools::fs::{lock_dir_noblock, DirLockGuard};
use pbs_tools::process_locker::ProcessLockSharedGuard;
+use pbs_tools::{task_log, task_warn, task::TaskState};
use pbs_config::{open_backup_lockfile, BackupLockGuard};
use proxmox_rest_server::fail_on_shutdown;
use anyhow::{bail, format_err, Error};
use pbs_api_types::{Authid, CryptMode, VerifyState, UPID, SnapshotVerifyState};
-use pbs_datastore::{task_log, DataBlob, StoreProgress};
+use pbs_datastore::{DataBlob, StoreProgress};
use pbs_datastore::backup_info::{BackupGroup, BackupDir, BackupInfo};
use pbs_datastore::index::IndexFile;
use pbs_datastore::manifest::{archive_type, ArchiveType, BackupManifest, FileInfo};
-use pbs_datastore::task::TaskState;
use pbs_tools::fs::lock_dir_noblock_shared;
+use pbs_tools::{task_log, task::TaskState};
use crate::{
backup::DataStore,
use proxmox::tools::fs::CreateOptions;
use pbs_tools::auth::private_auth_key;
-use proxmox_rest_server::{ApiConfig, RestServer};
-
-use proxmox_backup::server::{
- self,
- auth::default_api_auth,
-};
-use proxmox_rest_server::daemon;
+use proxmox_rest_server::{daemon, ApiConfig, RestServer};
+use proxmox_backup::server::auth::default_api_auth;
use proxmox_backup::auth_helpers::*;
use proxmox_backup::config;
)?;
let backup_user = pbs_config::backup_user()?;
- let mut commando_sock = proxmox_rest_server::CommandoSocket::new(crate::server::our_ctrl_sock(), backup_user.gid);
+ let mut commando_sock = proxmox_rest_server::CommandoSocket::new(proxmox_rest_server::our_ctrl_sock(), backup_user.gid);
let dir_opts = CreateOptions::new().owner(backup_user.uid).group(backup_user.gid);
let file_opts = CreateOptions::new().owner(backup_user.uid).group(backup_user.gid);
let rest_server = RestServer::new(config);
- proxmox_backup::server::init_worker_tasks(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR_M!().into(), file_opts.clone())?;
+ proxmox_rest_server::init_worker_tasks(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR_M!().into(), file_opts.clone())?;
// http server future:
let server = daemon::create_daemon(
"proxmox-backup.service",
);
- server::write_pid(pbs_buildcfg::PROXMOX_BACKUP_API_PID_FN)?;
+ proxmox_rest_server::write_pid(pbs_buildcfg::PROXMOX_BACKUP_API_PID_FN)?;
daemon::systemd_notify(daemon::SystemdNotify::Ready)?;
let init_result: Result<(), Error> = try_block!({
- server::register_task_control_commands(&mut commando_sock)?;
+ proxmox_rest_server::register_task_control_commands(&mut commando_sock)?;
commando_sock.spawn()?;
proxmox_rest_server::server_state_init()?;
Ok(())
IGNORE_VERIFIED_BACKUPS_SCHEMA, VERIFICATION_OUTDATED_AFTER_SCHEMA,
};
+use proxmox_rest_server::wait_for_local_worker;
+
use proxmox_backup::config;
use proxmox_backup::api2;
-use proxmox_backup::server::wait_for_local_worker;
mod proxmox_backup_manager;
use proxmox_backup_manager::*;
use proxmox::sys::linux::socket::set_tcp_keepalive;
use proxmox::tools::fs::CreateOptions;
-use proxmox_rest_server::{ApiConfig, RestServer};
+use proxmox_rest_server::{rotate_task_log_archive, ApiConfig, RestServer, WorkerTask};
use proxmox_backup::{
backup::DataStore,
server::{
auth::default_api_auth,
- WorkerTask,
jobstate::{
self,
Job,
},
- rotate_task_log_archive,
},
};
config.register_template("console", "/usr/share/pve-xtermjs/index.html.hbs")?;
let backup_user = pbs_config::backup_user()?;
- let mut commando_sock = proxmox_rest_server::CommandoSocket::new(crate::server::our_ctrl_sock(), backup_user.gid);
+ let mut commando_sock = proxmox_rest_server::CommandoSocket::new(proxmox_rest_server::our_ctrl_sock(), backup_user.gid);
let dir_opts = CreateOptions::new().owner(backup_user.uid).group(backup_user.gid);
let file_opts = CreateOptions::new().owner(backup_user.uid).group(backup_user.gid);
)?;
let rest_server = RestServer::new(config);
- proxmox_backup::server::init_worker_tasks(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR_M!().into(), file_opts.clone())?;
+ proxmox_rest_server::init_worker_tasks(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR_M!().into(), file_opts.clone())?;
//openssl req -x509 -newkey rsa:4096 -keyout /etc/proxmox-backup/proxy.key -out /etc/proxmox-backup/proxy.pem -nodes
"proxmox-backup-proxy.service",
);
- server::write_pid(pbs_buildcfg::PROXMOX_BACKUP_PROXY_PID_FN)?;
+ proxmox_rest_server::write_pid(pbs_buildcfg::PROXMOX_BACKUP_PROXY_PID_FN)?;
daemon::systemd_notify(daemon::SystemdNotify::Ready)?;
let init_result: Result<(), Error> = try_block!({
- server::register_task_control_commands(&mut commando_sock)?;
+ proxmox_rest_server::register_task_control_commands(&mut commando_sock)?;
commando_sock.spawn()?;
proxmox_rest_server::server_state_init()?;
Ok(())
async fn command_reopen_access_logfiles() -> Result<(), Error> {
// only care about the most recent daemon instance for each, proxy & api, as other older ones
// should not respond to new requests anyway, but only finish their current one and then exit.
- let sock = crate::server::our_ctrl_sock();
+ let sock = proxmox_rest_server::our_ctrl_sock();
let f1 = proxmox_rest_server::send_command(sock, "{\"command\":\"api-access-log-reopen\"}\n");
- let pid = crate::server::read_pid(pbs_buildcfg::PROXMOX_BACKUP_API_PID_FN)?;
- let sock = crate::server::ctrl_sock_from_pid(pid);
+ let pid = proxmox_rest_server::read_pid(pbs_buildcfg::PROXMOX_BACKUP_API_PID_FN)?;
+ let sock = proxmox_rest_server::ctrl_sock_from_pid(pid);
let f2 = proxmox_rest_server::send_command(sock, "{\"command\":\"api-access-log-reopen\"}\n");
match futures::join!(f1, f2) {
async fn command_reopen_auth_logfiles() -> Result<(), Error> {
// only care about the most recent daemon instance for each, proxy & api, as other older ones
// should not respond to new requests anyway, but only finish their current one and then exit.
- let sock = crate::server::our_ctrl_sock();
+ let sock = proxmox_rest_server::our_ctrl_sock();
let f1 = proxmox_rest_server::send_command(sock, "{\"command\":\"api-auth-log-reopen\"}\n");
- let pid = crate::server::read_pid(pbs_buildcfg::PROXMOX_BACKUP_API_PID_FN)?;
- let sock = crate::server::ctrl_sock_from_pid(pid);
+ let pid = proxmox_rest_server::read_pid(pbs_buildcfg::PROXMOX_BACKUP_API_PID_FN)?;
+ let sock = proxmox_rest_server::ctrl_sock_from_pid(pid);
let f2 = proxmox_rest_server::send_command(sock, "{\"command\":\"api-auth-log-reopen\"}\n");
match futures::join!(f1, f2) {
let sleep_duration = core::time::Duration::new(0, 100_000_000);
loop {
- if !proxmox_backup::server::worker_is_active_local(&upid) {
+ if !proxmox_rest_server::worker_is_active_local(&upid) {
break;
}
tokio::time::sleep(sleep_duration).await;
let abort_future = async move {
while signal_stream.recv().await.is_some() {
println!("got shutdown request (SIGINT)");
- proxmox_backup::server::abort_local_worker(upid.clone());
+ proxmox_rest_server::abort_local_worker(upid.clone());
}
Ok::<_, Error>(())
};
- let result_future = proxmox_backup::server::wait_for_local_worker(upid_str);
+ let result_future = proxmox_rest_server::wait_for_local_worker(upid_str);
futures::select! {
result = result_future.fuse() => result?,
use anyhow::Error;
use pbs_api_types::Authid;
+use proxmox_rest_server::WorkerTask;
use crate::{
- server::WorkerTask,
server::jobstate::Job,
backup::DataStore,
};
use proxmox::api::{ApiResponseFuture, HttpError, Router, RpcEnvironment};
use proxmox::http_err;
-use proxmox_rest_server::normalize_uri_path;
+use proxmox_rest_server::{normalize_uri_path, WorkerTask};
use proxmox_rest_server::formatter::*;
-use crate::server::WorkerTask;
-
/// Hyper Service implementation to handle stateful H2 connections.
///
/// We use this kind of service to handle backup protocol
//! an example usage would be
//! ```no_run
//! # use anyhow::{bail, Error};
-//! # use proxmox_backup::server::TaskState;
+//! # use proxmox_rest_server::TaskState;
//! # use proxmox_backup::server::jobstate::*;
//! # fn some_code() -> TaskState { TaskState::OK { endtime: 0 } }
//! # fn code() -> Result<(), Error> {
use pbs_config::{open_backup_lockfile, BackupLockGuard};
use pbs_api_types::{UPID, JobScheduleStatus};
-use crate::server::{
- TaskState,
- upid_read_status,
- worker_is_active_local,
-};
+use proxmox_rest_server::{upid_read_status, worker_is_active_local, TaskState};
#[derive(Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
//! services. We want async IO, so this is built on top of
//! tokio/hyper.
-use anyhow::{format_err, Error};
-use lazy_static::lazy_static;
-use nix::unistd::Pid;
+use anyhow::Error;
use serde_json::Value;
-use proxmox::sys::linux::procfs::PidStat;
use proxmox::tools::fs::{create_path, CreateOptions};
use pbs_buildcfg;
-lazy_static! {
- static ref PID: i32 = unsafe { libc::getpid() };
- static ref PSTART: u64 = PidStat::read_from_pid(Pid::from_raw(*PID)).unwrap().starttime;
-}
-
-pub fn pid() -> i32 {
- *PID
-}
-
-pub fn pstart() -> u64 {
- *PSTART
-}
-
-pub fn write_pid(pid_fn: &str) -> Result<(), Error> {
- let pid_str = format!("{}\n", *PID);
- proxmox::tools::fs::replace_file(pid_fn, pid_str.as_bytes(), CreateOptions::new())
-}
-
-pub fn read_pid(pid_fn: &str) -> Result<i32, Error> {
- let pid = proxmox::tools::fs::file_get_contents(pid_fn)?;
- let pid = std::str::from_utf8(&pid)?.trim();
- pid.parse().map_err(|err| format_err!("could not parse pid - {}", err))
-}
-
-pub fn ctrl_sock_from_pid(pid: i32) -> String {
- format!("\0{}/control-{}.sock", pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR, pid)
-}
-
-pub fn our_ctrl_sock() -> String {
- ctrl_sock_from_pid(*PID)
-}
-
-mod worker_task;
-pub use worker_task::*;
-
mod h2service;
pub use h2service::*;
pub mod pull;
pub(crate) async fn reload_proxy_certificate() -> Result<(), Error> {
- let proxy_pid = crate::server::read_pid(pbs_buildcfg::PROXMOX_BACKUP_PROXY_PID_FN)?;
- let sock = crate::server::ctrl_sock_from_pid(proxy_pid);
+ let proxy_pid = proxmox_rest_server::read_pid(pbs_buildcfg::PROXMOX_BACKUP_PROXY_PID_FN)?;
+ let sock = proxmox_rest_server::ctrl_sock_from_pid(proxy_pid);
let _: Value = proxmox_rest_server::send_raw_command(sock, "{\"command\":\"reload-certificate\"}\n")
.await?;
Ok(())
}
pub(crate) async fn notify_datastore_removed() -> Result<(), Error> {
- let proxy_pid = crate::server::read_pid(pbs_buildcfg::PROXMOX_BACKUP_PROXY_PID_FN)?;
- let sock = crate::server::ctrl_sock_from_pid(proxy_pid);
+ let proxy_pid = proxmox_rest_server::read_pid(pbs_buildcfg::PROXMOX_BACKUP_PROXY_PID_FN)?;
+ let sock = proxmox_rest_server::ctrl_sock_from_pid(proxy_pid);
let _: Value = proxmox_rest_server::send_raw_command(sock, "{\"command\":\"datastore-removed\"}\n")
.await?;
Ok(())
use anyhow::Error;
-use pbs_datastore::{task_log, task_warn};
use pbs_datastore::backup_info::BackupInfo;
use pbs_datastore::prune::compute_prune_info;
use pbs_api_types::{Authid, PRIV_DATASTORE_MODIFY, PruneOptions};
use pbs_config::CachedUserInfo;
+use pbs_tools::{task_log, task_warn};
+use proxmox_rest_server::WorkerTask;
use crate::{
backup::DataStore,
server::jobstate::Job,
- server::WorkerTask,
-};
+ };
pub fn prune_datastore(
worker: Arc<WorkerTask>,
use proxmox::api::error::{HttpError, StatusCode};
use pbs_api_types::{Authid, SnapshotListItem, GroupListItem};
-use pbs_datastore::{task_log, BackupInfo, BackupDir, BackupGroup, StoreProgress};
+use pbs_datastore::{BackupInfo, BackupDir, BackupGroup, StoreProgress};
use pbs_datastore::data_blob::DataBlob;
use pbs_datastore::dynamic_index::DynamicIndexReader;
use pbs_datastore::fixed_index::FixedIndexReader;
CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME, ArchiveType, BackupManifest, FileInfo, archive_type
};
use pbs_tools::sha::sha256;
+use pbs_tools::task_log;
use pbs_client::{BackupReader, BackupRepository, HttpClient, HttpClientOptions, RemoteChunkReader};
+use proxmox_rest_server::WorkerTask;
use crate::{
backup::DataStore,
- server::WorkerTask,
tools::ParallelHandler,
};
use anyhow::{format_err, Error};
-use pbs_datastore::task_log;
+use pbs_tools::task_log;
use pbs_api_types::{Authid, VerificationJobConfig};
+use proxmox_rest_server::WorkerTask;
use crate::{
- server::WorkerTask,
server::jobstate::Job,
backup::{
DataStore,
+++ /dev/null
-use std::collections::{HashMap, VecDeque};
-use std::fs::File;
-use std::path::PathBuf;
-use std::io::{Read, Write, BufRead, BufReader};
-use std::panic::UnwindSafe;
-use std::sync::atomic::{AtomicBool, Ordering};
-use std::sync::{Arc, Mutex};
-
-use anyhow::{bail, format_err, Error};
-use futures::*;
-use lazy_static::lazy_static;
-use serde_json::{json, Value};
-use serde::{Serialize, Deserialize};
-use tokio::sync::oneshot;
-use nix::fcntl::OFlag;
-use once_cell::sync::OnceCell;
-
-use proxmox::sys::linux::procfs;
-use proxmox::try_block;
-use proxmox::tools::fs::{create_path, replace_file, atomic_open_or_create_file, CreateOptions};
-use proxmox::api::upid::UPID;
-
-use pbs_tools::logrotate::{LogRotate, LogRotateFiles};
-use proxmox_rest_server::{CommandoSocket, FileLogger, FileLogOptions};
-
-struct TaskListLockGuard(File);
-
-struct WorkerTaskSetup {
- file_opts: CreateOptions,
- taskdir: PathBuf,
- task_lock_fn: PathBuf,
- active_tasks_fn: PathBuf,
- task_index_fn: PathBuf,
- task_archive_fn: PathBuf,
-}
-
-static WORKER_TASK_SETUP: OnceCell<WorkerTaskSetup> = OnceCell::new();
-
-fn worker_task_setup() -> Result<&'static WorkerTaskSetup, Error> {
- WORKER_TASK_SETUP.get()
- .ok_or_else(|| format_err!("WorkerTask library is not initialized"))
-}
-
-impl WorkerTaskSetup {
-
- fn new(basedir: PathBuf, file_opts: CreateOptions) -> Self {
-
- let mut taskdir = basedir.clone();
- taskdir.push("tasks");
-
- let mut task_lock_fn = taskdir.clone();
- task_lock_fn.push(".active.lock");
-
- let mut active_tasks_fn = taskdir.clone();
- active_tasks_fn.push("active");
-
- let mut task_index_fn = taskdir.clone();
- task_index_fn.push("index");
-
- let mut task_archive_fn = taskdir.clone();
- task_archive_fn.push("archive");
-
- Self {
- file_opts,
- taskdir,
- task_lock_fn,
- active_tasks_fn,
- task_index_fn,
- task_archive_fn,
- }
- }
-
- fn lock_task_list_files(&self, exclusive: bool) -> Result<TaskListLockGuard, Error> {
- let options = self.file_opts.clone()
- .perm(nix::sys::stat::Mode::from_bits_truncate(0o660));
-
- let timeout = std::time::Duration::new(10, 0);
-
- let file = proxmox::tools::fs::open_file_locked(
- &self.task_lock_fn,
- timeout,
- exclusive,
- options,
- )?;
-
- Ok(TaskListLockGuard(file))
- }
-
- fn log_path(&self, upid: &UPID) -> std::path::PathBuf {
- let mut path = self.taskdir.clone();
- path.push(format!("{:02X}", upid.pstart % 256));
- path.push(upid.to_string());
- path
- }
-
- // atomically read/update the task list, update status of finished tasks
- // new_upid is added to the list when specified.
- fn update_active_workers(&self, new_upid: Option<&UPID>) -> Result<(), Error> {
-
- let lock = self.lock_task_list_files(true)?;
-
- // TODO remove with 1.x
- let mut finish_list: Vec<TaskListInfo> = read_task_file_from_path(&self.task_index_fn)?;
- let had_index_file = !finish_list.is_empty();
-
- // We use filter_map because one negative case wants to *move* the data into `finish_list`,
- // clippy doesn't quite catch this!
- #[allow(clippy::unnecessary_filter_map)]
- let mut active_list: Vec<TaskListInfo> = read_task_file_from_path(&self.active_tasks_fn)?
- .into_iter()
- .filter_map(|info| {
- if info.state.is_some() {
- // this can happen when the active file still includes finished tasks
- finish_list.push(info);
- return None;
- }
-
- if !worker_is_active_local(&info.upid) {
- // println!("Detected stopped task '{}'", &info.upid_str);
- let now = proxmox::tools::time::epoch_i64();
- let status = upid_read_status(&info.upid).unwrap_or(TaskState::Unknown { endtime: now });
- finish_list.push(TaskListInfo {
- upid: info.upid,
- upid_str: info.upid_str,
- state: Some(status)
- });
- return None;
- }
-
- Some(info)
- }).collect();
-
- if let Some(upid) = new_upid {
- active_list.push(TaskListInfo { upid: upid.clone(), upid_str: upid.to_string(), state: None });
- }
-
- let active_raw = render_task_list(&active_list);
-
- let options = self.file_opts.clone()
- .perm(nix::sys::stat::Mode::from_bits_truncate(0o660));
-
- replace_file(
- &self.active_tasks_fn,
- active_raw.as_bytes(),
- options,
- )?;
-
- finish_list.sort_unstable_by(|a, b| {
- match (&a.state, &b.state) {
- (Some(s1), Some(s2)) => s1.cmp(&s2),
- (Some(_), None) => std::cmp::Ordering::Less,
- (None, Some(_)) => std::cmp::Ordering::Greater,
- _ => a.upid.starttime.cmp(&b.upid.starttime),
- }
- });
-
- if !finish_list.is_empty() {
- let options = self.file_opts.clone()
- .perm(nix::sys::stat::Mode::from_bits_truncate(0o660));
-
- let mut writer = atomic_open_or_create_file(
- &self.task_archive_fn,
- OFlag::O_APPEND | OFlag::O_RDWR,
- &[],
- options,
- )?;
- for info in &finish_list {
- writer.write_all(render_task_line(&info).as_bytes())?;
- }
- }
-
- // TODO Remove with 1.x
- // for compatibility, if we had an INDEX file, we do not need it anymore
- if had_index_file {
- let _ = nix::unistd::unlink(&self.task_index_fn);
- }
-
- drop(lock);
-
- Ok(())
- }
-
- // Create task log directory with correct permissions
- fn create_task_log_dirs(&self) -> Result<(), Error> {
-
- try_block!({
- let dir_opts = self.file_opts.clone()
- .perm(nix::sys::stat::Mode::from_bits_truncate(0o755));
-
- create_path(&self.taskdir, Some(dir_opts.clone()), Some(dir_opts.clone()))?;
- // fixme:??? create_path(pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR, None, Some(opts))?;
- Ok(())
- }).map_err(|err: Error| format_err!("unable to create task log dir - {}", err))
- }
-}
-
-/// Initialize the WorkerTask library
-pub fn init_worker_tasks(basedir: PathBuf, file_opts: CreateOptions) -> Result<(), Error> {
- let setup = WorkerTaskSetup::new(basedir, file_opts);
- setup.create_task_log_dirs()?;
- WORKER_TASK_SETUP.set(setup)
- .map_err(|_| format_err!("init_worker_tasks failed - already initialized"))
-}
-
-/// checks if the Task Archive is bigger that 'size_threshold' bytes, and
-/// rotates it if it is
-pub fn rotate_task_log_archive(size_threshold: u64, compress: bool, max_files: Option<usize>) -> Result<bool, Error> {
-
- let setup = worker_task_setup()?;
-
- let _lock = setup.lock_task_list_files(true)?;
-
- let mut logrotate = LogRotate::new(&setup.task_archive_fn, compress)
- .ok_or_else(|| format_err!("could not get archive file names"))?;
-
- logrotate.rotate(size_threshold, None, max_files)
-}
-
-
-/// Path to the worker log file
-pub fn upid_log_path(upid: &UPID) -> Result<std::path::PathBuf, Error> {
- let setup = worker_task_setup()?;
- Ok(setup.log_path(upid))
-}
-
-/// Read endtime (time of last log line) and exitstatus from task log file
-/// If there is not a single line with at valid datetime, we assume the
-/// starttime to be the endtime
-pub fn upid_read_status(upid: &UPID) -> Result<TaskState, Error> {
-
- let setup = worker_task_setup()?;
-
- let mut status = TaskState::Unknown { endtime: upid.starttime };
-
- let path = setup.log_path(upid);
-
- let mut file = File::open(path)?;
-
- /// speedup - only read tail
- use std::io::Seek;
- use std::io::SeekFrom;
- let _ = file.seek(SeekFrom::End(-8192)); // ignore errors
-
- let mut data = Vec::with_capacity(8192);
- file.read_to_end(&mut data)?;
-
- // strip newlines at the end of the task logs
- while data.last() == Some(&b'\n') {
- data.pop();
- }
-
- let last_line = match data.iter().rposition(|c| *c == b'\n') {
- Some(start) if data.len() > (start+1) => &data[start+1..],
- Some(_) => &data, // should not happen, since we removed all trailing newlines
- None => &data,
- };
-
- let last_line = std::str::from_utf8(last_line)
- .map_err(|err| format_err!("upid_read_status: utf8 parse failed: {}", err))?;
-
- let mut iter = last_line.splitn(2, ": ");
- if let Some(time_str) = iter.next() {
- if let Ok(endtime) = proxmox::tools::time::parse_rfc3339(time_str) {
- // set the endtime even if we cannot parse the state
- status = TaskState::Unknown { endtime };
- if let Some(rest) = iter.next().and_then(|rest| rest.strip_prefix("TASK ")) {
- if let Ok(state) = TaskState::from_endtime_and_message(endtime, rest) {
- status = state;
- }
- }
- }
- }
-
- Ok(status)
-}
-
-lazy_static! {
- static ref WORKER_TASK_LIST: Mutex<HashMap<usize, Arc<WorkerTask>>> = Mutex::new(HashMap::new());
-}
-
-/// checks if the task UPID refers to a worker from this process
-fn is_local_worker(upid: &UPID) -> bool {
- upid.pid == crate::server::pid() && upid.pstart == crate::server::pstart()
-}
-
-/// Test if the task is still running
-pub async fn worker_is_active(upid: &UPID) -> Result<bool, Error> {
- if is_local_worker(upid) {
- return Ok(WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id));
- }
-
- if procfs::check_process_running_pstart(upid.pid, upid.pstart).is_none() {
- return Ok(false);
- }
-
- let sock = crate::server::ctrl_sock_from_pid(upid.pid);
- let cmd = json!({
- "command": "worker-task-status",
- "args": {
- "upid": upid.to_string(),
- },
- });
- let status = proxmox_rest_server::send_command(sock, &cmd).await?;
-
- if let Some(active) = status.as_bool() {
- Ok(active)
- } else {
- bail!("got unexpected result {:?} (expected bool)", status);
- }
-}
-
-/// Test if the task is still running (fast but inaccurate implementation)
-///
-/// If the task is spawned from a different process, we simply return if
-/// that process is still running. This information is good enough to detect
-/// stale tasks...
-pub fn worker_is_active_local(upid: &UPID) -> bool {
- if is_local_worker(upid) {
- WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id)
- } else {
- procfs::check_process_running_pstart(upid.pid, upid.pstart).is_some()
- }
-}
-
-pub fn register_task_control_commands(
- commando_sock: &mut CommandoSocket,
-) -> Result<(), Error> {
- fn get_upid(args: Option<&Value>) -> Result<UPID, Error> {
- let args = if let Some(args) = args { args } else { bail!("missing args") };
- let upid = match args.get("upid") {
- Some(Value::String(upid)) => upid.parse::<UPID>()?,
- None => bail!("no upid in args"),
- _ => bail!("unable to parse upid"),
- };
- if !is_local_worker(&upid) {
- bail!("upid does not belong to this process");
- }
- Ok(upid)
- }
-
- commando_sock.register_command("worker-task-abort".into(), move |args| {
- let upid = get_upid(args)?;
-
- abort_local_worker(upid);
-
- Ok(Value::Null)
- })?;
- commando_sock.register_command("worker-task-status".into(), move |args| {
- let upid = get_upid(args)?;
-
- let active = WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id);
-
- Ok(active.into())
- })?;
-
- Ok(())
-}
-
-pub fn abort_worker_async(upid: UPID) {
- tokio::spawn(async move {
- if let Err(err) = abort_worker(upid).await {
- eprintln!("abort worker failed - {}", err);
- }
- });
-}
-
-pub async fn abort_worker(upid: UPID) -> Result<(), Error> {
-
- let sock = crate::server::ctrl_sock_from_pid(upid.pid);
- let cmd = json!({
- "command": "worker-task-abort",
- "args": {
- "upid": upid.to_string(),
- },
- });
- proxmox_rest_server::send_command(sock, &cmd).map_ok(|_| ()).await
-}
-
-fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<TaskState>), Error> {
-
- let data = line.splitn(3, ' ').collect::<Vec<&str>>();
-
- let len = data.len();
-
- match len {
- 1 => Ok((data[0].to_owned(), data[0].parse::<UPID>()?, None)),
- 3 => {
- let endtime = i64::from_str_radix(data[1], 16)?;
- let state = TaskState::from_endtime_and_message(endtime, data[2])?;
- Ok((data[0].to_owned(), data[0].parse::<UPID>()?, Some(state)))
- }
- _ => bail!("wrong number of components"),
- }
-}
-
-/// Task State
-#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
-pub enum TaskState {
- /// The Task ended with an undefined state
- Unknown { endtime: i64 },
- /// The Task ended and there were no errors or warnings
- OK { endtime: i64 },
- /// The Task had 'count' amount of warnings and no errors
- Warning { count: u64, endtime: i64 },
- /// The Task ended with the error described in 'message'
- Error { message: String, endtime: i64 },
-}
-
-impl TaskState {
- pub fn endtime(&self) -> i64 {
- match *self {
- TaskState::Unknown { endtime } => endtime,
- TaskState::OK { endtime } => endtime,
- TaskState::Warning { endtime, .. } => endtime,
- TaskState::Error { endtime, .. } => endtime,
- }
- }
-
- fn result_text(&self) -> String {
- match self {
- TaskState::Error { message, .. } => format!("TASK ERROR: {}", message),
- other => format!("TASK {}", other),
- }
- }
-
- fn from_endtime_and_message(endtime: i64, s: &str) -> Result<Self, Error> {
- if s == "unknown" {
- Ok(TaskState::Unknown { endtime })
- } else if s == "OK" {
- Ok(TaskState::OK { endtime })
- } else if let Some(warnings) = s.strip_prefix("WARNINGS: ") {
- let count: u64 = warnings.parse()?;
- Ok(TaskState::Warning{ count, endtime })
- } else if !s.is_empty() {
- let message = if let Some(err) = s.strip_prefix("ERROR: ") { err } else { s }.to_string();
- Ok(TaskState::Error{ message, endtime })
- } else {
- bail!("unable to parse Task Status '{}'", s);
- }
- }
-}
-
-impl std::cmp::PartialOrd for TaskState {
- fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
- Some(self.endtime().cmp(&other.endtime()))
- }
-}
-
-impl std::cmp::Ord for TaskState {
- fn cmp(&self, other: &Self) -> std::cmp::Ordering {
- self.endtime().cmp(&other.endtime())
- }
-}
-
-impl std::fmt::Display for TaskState {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- match self {
- TaskState::Unknown { .. } => write!(f, "unknown"),
- TaskState::OK { .. }=> write!(f, "OK"),
- TaskState::Warning { count, .. } => write!(f, "WARNINGS: {}", count),
- TaskState::Error { message, .. } => write!(f, "{}", message),
- }
- }
-}
-
-/// Task details including parsed UPID
-///
-/// If there is no `state`, the task is still running.
-#[derive(Debug)]
-pub struct TaskListInfo {
- /// The parsed UPID
- pub upid: UPID,
- /// UPID string representation
- pub upid_str: String,
- /// Task `(endtime, status)` if already finished
- pub state: Option<TaskState>, // endtime, status
-}
-
-impl Into<pbs_api_types::TaskListItem> for TaskListInfo {
- fn into(self) -> pbs_api_types::TaskListItem {
- let (endtime, status) = self
- .state
- .map_or_else(|| (None, None), |a| (Some(a.endtime()), Some(a.to_string())));
-
- pbs_api_types::TaskListItem {
- upid: self.upid_str,
- node: "localhost".to_string(),
- pid: self.upid.pid as i64,
- pstart: self.upid.pstart,
- starttime: self.upid.starttime,
- worker_type: self.upid.worker_type,
- worker_id: self.upid.worker_id,
- user: self.upid.auth_id,
- endtime,
- status,
- }
- }
-}
-
-fn render_task_line(info: &TaskListInfo) -> String {
- let mut raw = String::new();
- if let Some(status) = &info.state {
- raw.push_str(&format!("{} {:08X} {}\n", info.upid_str, status.endtime(), status));
- } else {
- raw.push_str(&info.upid_str);
- raw.push('\n');
- }
-
- raw
-}
-
-fn render_task_list(list: &[TaskListInfo]) -> String {
- let mut raw = String::new();
- for info in list {
- raw.push_str(&render_task_line(&info));
- }
- raw
-}
-
-// note this is not locked, caller has to make sure it is
-// this will skip (and log) lines that are not valid status lines
-fn read_task_file<R: Read>(reader: R) -> Result<Vec<TaskListInfo>, Error>
-{
- let reader = BufReader::new(reader);
- let mut list = Vec::new();
- for line in reader.lines() {
- let line = line?;
- match parse_worker_status_line(&line) {
- Ok((upid_str, upid, state)) => list.push(TaskListInfo {
- upid_str,
- upid,
- state
- }),
- Err(err) => {
- eprintln!("unable to parse worker status '{}' - {}", line, err);
- continue;
- }
- };
- }
-
- Ok(list)
-}
-
-// note this is not locked, caller has to make sure it is
-fn read_task_file_from_path<P>(path: P) -> Result<Vec<TaskListInfo>, Error>
-where
- P: AsRef<std::path::Path> + std::fmt::Debug,
-{
- let file = match File::open(&path) {
- Ok(f) => f,
- Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
- Err(err) => bail!("unable to open task list {:?} - {}", path, err),
- };
-
- read_task_file(file)
-}
-
-pub struct TaskListInfoIterator {
- list: VecDeque<TaskListInfo>,
- end: bool,
- archive: Option<LogRotateFiles>,
- lock: Option<TaskListLockGuard>,
-}
-
-impl TaskListInfoIterator {
- pub fn new(active_only: bool) -> Result<Self, Error> {
-
- let setup = worker_task_setup()?;
-
- let (read_lock, active_list) = {
- let lock = setup.lock_task_list_files(false)?;
- let active_list = read_task_file_from_path(&setup.active_tasks_fn)?;
-
- let needs_update = active_list
- .iter()
- .any(|info| info.state.is_some() || !worker_is_active_local(&info.upid));
-
- // TODO remove with 1.x
- let index_exists = setup.task_index_fn.is_file();
-
- if needs_update || index_exists {
- drop(lock);
- setup.update_active_workers(None)?;
- let lock = setup.lock_task_list_files(false)?;
- let active_list = read_task_file_from_path(&setup.active_tasks_fn)?;
- (lock, active_list)
- } else {
- (lock, active_list)
- }
- };
-
- let archive = if active_only {
- None
- } else {
- let logrotate = LogRotate::new(&setup.task_archive_fn, true)
- .ok_or_else(|| format_err!("could not get archive file names"))?;
- Some(logrotate.files())
- };
-
- let lock = if active_only { None } else { Some(read_lock) };
-
- Ok(Self {
- list: active_list.into(),
- end: active_only,
- archive,
- lock,
- })
- }
-}
-
-impl Iterator for TaskListInfoIterator {
- type Item = Result<TaskListInfo, Error>;
-
- fn next(&mut self) -> Option<Self::Item> {
- loop {
- if let Some(element) = self.list.pop_back() {
- return Some(Ok(element));
- } else if self.end {
- return None;
- } else {
- if let Some(mut archive) = self.archive.take() {
- if let Some(file) = archive.next() {
- let list = match read_task_file(file) {
- Ok(list) => list,
- Err(err) => return Some(Err(err)),
- };
- self.list.append(&mut list.into());
- self.archive = Some(archive);
- continue;
- }
- }
-
- self.end = true;
- self.lock.take();
- }
- }
- }
-}
-
-/// Launch long running worker tasks.
-///
-/// A worker task can either be a whole thread, or a simply tokio
-/// task/future. Each task can `log()` messages, which are stored
-/// persistently to files. Task should poll the `abort_requested`
-/// flag, and stop execution when requested.
-pub struct WorkerTask {
- setup: &'static WorkerTaskSetup,
- upid: UPID,
- data: Mutex<WorkerTaskData>,
- abort_requested: AtomicBool,
-}
-
-impl std::fmt::Display for WorkerTask {
-
- fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
- self.upid.fmt(f)
- }
-}
-
-struct WorkerTaskData {
- logger: FileLogger,
- progress: f64, // 0..1
- warn_count: u64,
- pub abort_listeners: Vec<oneshot::Sender<()>>,
-}
-
-impl WorkerTask {
-
- pub fn new(
- worker_type: &str,
- worker_id: Option<String>,
- auth_id: String,
- to_stdout: bool,
- ) -> Result<Arc<Self>, Error> {
-
- let setup = worker_task_setup()?;
-
- let upid = UPID::new(worker_type, worker_id, auth_id)?;
- let task_id = upid.task_id;
-
- let mut path = setup.taskdir.clone();
-
- path.push(format!("{:02X}", upid.pstart & 255));
-
- let dir_opts = setup.file_opts.clone()
- .perm(nix::sys::stat::Mode::from_bits_truncate(0o755));
-
- create_path(&path, None, Some(dir_opts))?;
-
- path.push(upid.to_string());
-
- let logger_options = FileLogOptions {
- to_stdout,
- exclusive: true,
- prefix_time: true,
- read: true,
- file_opts: setup.file_opts.clone(),
- ..Default::default()
- };
- let logger = FileLogger::new(&path, logger_options)?;
-
- let worker = Arc::new(Self {
- setup,
- upid: upid.clone(),
- abort_requested: AtomicBool::new(false),
- data: Mutex::new(WorkerTaskData {
- logger,
- progress: 0.0,
- warn_count: 0,
- abort_listeners: vec![],
- }),
- });
-
- // scope to drop the lock again after inserting
- {
- let mut hash = WORKER_TASK_LIST.lock().unwrap();
- hash.insert(task_id, worker.clone());
- proxmox_rest_server::set_worker_count(hash.len());
- }
-
- setup.update_active_workers(Some(&upid))?;
-
- Ok(worker)
- }
-
- /// Spawn a new tokio task/future.
- pub fn spawn<F, T>(
- worker_type: &str,
- worker_id: Option<String>,
- auth_id: String,
- to_stdout: bool,
- f: F,
- ) -> Result<String, Error>
- where F: Send + 'static + FnOnce(Arc<WorkerTask>) -> T,
- T: Send + 'static + Future<Output = Result<(), Error>>,
- {
- let worker = WorkerTask::new(worker_type, worker_id, auth_id, to_stdout)?;
- let upid_str = worker.upid.to_string();
- let f = f(worker.clone());
- tokio::spawn(async move {
- let result = f.await;
- worker.log_result(&result);
- });
-
- Ok(upid_str)
- }
-
- /// Create a new worker thread.
- pub fn new_thread<F>(
- worker_type: &str,
- worker_id: Option<String>,
- auth_id: String,
- to_stdout: bool,
- f: F,
- ) -> Result<String, Error>
- where F: Send + UnwindSafe + 'static + FnOnce(Arc<WorkerTask>) -> Result<(), Error>
- {
- let worker = WorkerTask::new(worker_type, worker_id, auth_id, to_stdout)?;
- let upid_str = worker.upid.to_string();
-
- let _child = std::thread::Builder::new().name(upid_str.clone()).spawn(move || {
- let worker1 = worker.clone();
- let result = match std::panic::catch_unwind(move || f(worker1)) {
- Ok(r) => r,
- Err(panic) => {
- match panic.downcast::<&str>() {
- Ok(panic_msg) => {
- Err(format_err!("worker panicked: {}", panic_msg))
- }
- Err(_) => {
- Err(format_err!("worker panicked: unknown type."))
- }
- }
- }
- };
-
- worker.log_result(&result);
- });
-
- Ok(upid_str)
- }
-
- /// create state from self and a result
- pub fn create_state(&self, result: &Result<(), Error>) -> TaskState {
- let warn_count = self.data.lock().unwrap().warn_count;
-
- let endtime = proxmox::tools::time::epoch_i64();
-
- if let Err(err) = result {
- TaskState::Error { message: err.to_string(), endtime }
- } else if warn_count > 0 {
- TaskState::Warning { count: warn_count, endtime }
- } else {
- TaskState::OK { endtime }
- }
- }
-
- /// Log task result, remove task from running list
- pub fn log_result(&self, result: &Result<(), Error>) {
- let state = self.create_state(result);
- self.log(state.result_text());
-
- WORKER_TASK_LIST.lock().unwrap().remove(&self.upid.task_id);
- let _ = self.setup.update_active_workers(None);
- proxmox_rest_server::set_worker_count(WORKER_TASK_LIST.lock().unwrap().len());
- }
-
- /// Log a message.
- pub fn log<S: AsRef<str>>(&self, msg: S) {
- let mut data = self.data.lock().unwrap();
- data.logger.log(msg);
- }
-
- /// Log a message as warning.
- pub fn warn<S: AsRef<str>>(&self, msg: S) {
- let mut data = self.data.lock().unwrap();
- data.logger.log(format!("WARN: {}", msg.as_ref()));
- data.warn_count += 1;
- }
-
- /// Set progress indicator
- pub fn progress(&self, progress: f64) {
- if progress >= 0.0 && progress <= 1.0 {
- let mut data = self.data.lock().unwrap();
- data.progress = progress;
- } else {
- // fixme: log!("task '{}': ignoring strange value for progress '{}'", self.upid, progress);
- }
- }
-
- /// Request abort
- pub fn request_abort(&self) {
- eprintln!("set abort flag for worker {}", self.upid);
-
- let prev_abort = self.abort_requested.swap(true, Ordering::SeqCst);
- if !prev_abort { // log abort one time
- self.log(format!("received abort request ..."));
- }
- // noitify listeners
- let mut data = self.data.lock().unwrap();
- loop {
- match data.abort_listeners.pop() {
- None => { break; },
- Some(ch) => {
- let _ = ch.send(()); // ignore errors here
- },
- }
- }
- }
-
- /// Test if abort was requested.
- pub fn abort_requested(&self) -> bool {
- self.abort_requested.load(Ordering::SeqCst)
- }
-
- /// Fail if abort was requested.
- pub fn fail_on_abort(&self) -> Result<(), Error> {
- if self.abort_requested() {
- bail!("abort requested - aborting task");
- }
- Ok(())
- }
-
- /// Get a future which resolves on task abort
- pub fn abort_future(&self) -> oneshot::Receiver<()> {
- let (tx, rx) = oneshot::channel::<()>();
-
- let mut data = self.data.lock().unwrap();
- if self.abort_requested() {
- let _ = tx.send(());
- } else {
- data.abort_listeners.push(tx);
- }
- rx
- }
-
- pub fn upid(&self) -> &UPID {
- &self.upid
- }
-}
-
-impl pbs_datastore::task::TaskState for WorkerTask {
- fn check_abort(&self) -> Result<(), Error> {
- self.fail_on_abort()
- }
-
- fn log(&self, level: log::Level, message: &std::fmt::Arguments) {
- match level {
- log::Level::Error => self.warn(&message.to_string()),
- log::Level::Warn => self.warn(&message.to_string()),
- log::Level::Info => self.log(&message.to_string()),
- log::Level::Debug => self.log(&format!("DEBUG: {}", message)),
- log::Level::Trace => self.log(&format!("TRACE: {}", message)),
- }
- }
-}
-
-/// Wait for a locally spanned worker task
-///
-/// Note: local workers should print logs to stdout, so there is no
-/// need to fetch/display logs. We just wait for the worker to finish.
-pub async fn wait_for_local_worker(upid_str: &str) -> Result<(), Error> {
-
- let upid: UPID = upid_str.parse()?;
-
- let sleep_duration = core::time::Duration::new(0, 100_000_000);
-
- loop {
- if worker_is_active_local(&upid) {
- tokio::time::sleep(sleep_duration).await;
- } else {
- break;
- }
- }
- Ok(())
-}
-
-/// Request abort of a local worker (if existing and running)
-pub fn abort_local_worker(upid: UPID) {
- if let Some(ref worker) = WORKER_TASK_LIST.lock().unwrap().get(&upid.task_id) {
- worker.request_abort();
- }
-}
use pbs_api_types::{VirtualTapeDrive, LtoTapeDrive, Fingerprint};
use pbs_config::key_config::KeyConfig;
-use pbs_datastore::task::TaskState;
-use pbs_datastore::task_log;
+use pbs_tools::{task_log, task::TaskState};
use pbs_tape::{
TapeWrite, TapeRead, BlockReadError, MediaContentHeader,
sg_tape::TapeAlertFlags,
};
+use proxmox_rest_server::WorkerTask;
use crate::{
- server::{
- send_load_media_email,
- WorkerTask,
- },
+ server::send_load_media_email,
tape::{
MediaId,
drive::{
use proxmox::tools::Uuid;
-use pbs_datastore::task_log;
+use pbs_tools::task_log;
use pbs_config::tape_encryption_keys::load_key_configs;
use pbs_tape::{
TapeWrite,
sg_tape::tape_alert_flags_critical,
};
+use proxmox_rest_server::WorkerTask;
use crate::{
backup::{DataStore, SnapshotReader},
- server::WorkerTask,
tape::{
TAPE_STATUS_DIR,
MAX_CHUNK_ARCHIVE_SIZE,
extern crate nix;
use proxmox::try_block;
+use proxmox::tools::fs::CreateOptions;
use pbs_api_types::{Authid, UPID};
-use proxmox_rest_server::{flog, CommandoSocket};
-use proxmox_backup::server;
+use proxmox_rest_server::{flog, CommandoSocket, WorkerTask};
-fn garbage_collection(worker: &server::WorkerTask) -> Result<(), Error> {
+fn garbage_collection(worker: &WorkerTask) -> Result<(), Error> {
worker.log("start garbage collection");
#[test]
#[ignore]
fn worker_task_abort() -> Result<(), Error> {
-
- server::create_task_log_dirs()?;
-
+ let uid = nix::unistd::Uid::current();
+ let gid = nix::unistd::Gid::current();
+
+ let file_opts = CreateOptions::new().owner(uid).group(gid);
+ proxmox_rest_server::init_worker_tasks("./target/tasklogtestdir".into(), file_opts.clone())?;
+
use std::sync::{Arc, Mutex};
let errmsg: Arc<Mutex<Option<String>>> = Arc::new(Mutex::new(None));
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async move {
- let mut commando_sock = CommandoSocket::new(server::our_ctrl_sock(), nix::unistd::Gid::current());
+ let mut commando_sock = CommandoSocket::new(
+ proxmox_rest_server::our_ctrl_sock(), nix::unistd::Gid::current());
let init_result: Result<(), Error> = try_block!({
- server::register_task_control_commands(&mut commando_sock)?;
+ proxmox_rest_server::register_task_control_commands(&mut commando_sock)?;
proxmox_rest_server::server_state_init()?;
Ok(())
});
}
let errmsg = errmsg1.clone();
- let res = server::WorkerTask::new_thread(
+ let res = WorkerTask::new_thread(
"garbage_collection",
None,
- Authid::root_auth_id().clone(),
+ Authid::root_auth_id().to_string(),
true,
move |worker| {
println!("WORKER {}", worker);
}
Ok(wid) => {
println!("WORKER: {}", wid);
- server::abort_worker_async(wid.parse::<UPID>().unwrap());
- server::wait_for_local_worker(&wid).await.unwrap();
+ proxmox_rest_server::abort_worker_async(wid.parse::<UPID>().unwrap());
+ proxmox_rest_server::wait_for_local_worker(&wid).await.unwrap();
}
}
});