]> git.proxmox.com Git - proxmox-backup.git/commitdiff
move jobstate to server
authorDietmar Maurer <dietmar@proxmox.com>
Wed, 28 Oct 2020 06:33:05 +0000 (07:33 +0100)
committerDietmar Maurer <dietmar@proxmox.com>
Wed, 28 Oct 2020 06:37:01 +0000 (07:37 +0100)
13 files changed:
src/api2/admin/sync.rs
src/api2/admin/verify.rs
src/api2/config/datastore.rs
src/api2/config/sync.rs
src/api2/config/verify.rs
src/api2/pull.rs
src/backup/verify.rs
src/bin/proxmox-backup-api.rs
src/bin/proxmox-backup-proxy.rs
src/config.rs
src/config/jobstate.rs [deleted file]
src/server.rs
src/server/jobstate.rs [new file with mode: 0644]

index e1072f7266ed0428a744336f895f4facd7b59066..4a10444167ad33c0ebc88b7d3bbfa95a24bb5627 100644 (file)
@@ -9,7 +9,7 @@ use crate::api2::types::*;
 use crate::api2::pull::do_sync_job;
 use crate::config::sync::{self, SyncJobStatus, SyncJobConfig};
 use crate::server::UPID;
-use crate::config::jobstate::{Job, JobState};
+use crate::server::jobstate::{Job, JobState};
 use crate::tools::systemd::time::{
     parse_calendar_event, compute_next_event};
 
index c5d84b433bc796ae75619789658d14bd30ab2a6d..4bc184d3fea30454eb66050e4de34aaf0074828c 100644 (file)
@@ -5,8 +5,8 @@ use proxmox::{list_subdirs_api_method, sortable};
 use proxmox::api::{api, ApiMethod, Router, RpcEnvironment};
 
 use crate::api2::types::*;
-use crate::backup::do_verification_job;
-use crate::config::jobstate::{Job, JobState};
+use crate::server::do_verification_job;
+use crate::server::jobstate::{Job, JobState};
 use crate::config::verify;
 use crate::config::verify::{VerificationJobConfig, VerificationJobStatus};
 use serde_json::Value;
index 4f9cb7c764016a2e3ebe0f47aa75dec79b071757..4847bdadab3be896ba6a9a43d8fb0e5a57ab5e90 100644 (file)
@@ -12,6 +12,7 @@ use crate::backup::*;
 use crate::config::cached_user_info::CachedUserInfo;
 use crate::config::datastore::{self, DataStoreConfig, DIR_NAME_SCHEMA};
 use crate::config::acl::{PRIV_DATASTORE_ALLOCATE, PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_MODIFY};
+use crate::server::jobstate;
 
 #[api(
     input: {
@@ -127,8 +128,8 @@ pub fn create_datastore(param: Value) -> Result<(), Error> {
 
     datastore::save_config(&config)?;
 
-    crate::config::jobstate::create_state_file("prune", &datastore.name)?;
-    crate::config::jobstate::create_state_file("garbage_collection", &datastore.name)?;
+    jobstate::create_state_file("prune", &datastore.name)?;
+    jobstate::create_state_file("garbage_collection", &datastore.name)?;
 
     Ok(())
 }
@@ -328,11 +329,11 @@ pub fn update_datastore(
     // we want to reset the statefiles, to avoid an immediate action in some cases
     // (e.g. going from monthly to weekly in the second week of the month)
     if gc_schedule_changed {
-        crate::config::jobstate::create_state_file("garbage_collection", &name)?;
+        jobstate::create_state_file("garbage_collection", &name)?;
     }
 
     if prune_schedule_changed {
-        crate::config::jobstate::create_state_file("prune", &name)?;
+        jobstate::create_state_file("prune", &name)?;
     }
 
     Ok(())
@@ -375,8 +376,8 @@ pub fn delete_datastore(name: String, digest: Option<String>) -> Result<(), Erro
     datastore::save_config(&config)?;
 
     // ignore errors
-    let _ = crate::config::jobstate::remove_state_file("prune", &name);
-    let _ = crate::config::jobstate::remove_state_file("garbage_collection", &name);
+    let _ = jobstate::remove_state_file("prune", &name);
+    let _ = jobstate::remove_state_file("garbage_collection", &name);
 
     Ok(())
 }
index a3aa0ec5b72d6ddcd10333ae0f558f69550eee0f..e5baa7dacafcd0592ea667c117840db170f31015 100644 (file)
@@ -83,7 +83,7 @@ pub fn create_sync_job(param: Value) -> Result<(), Error> {
 
     sync::save_config(&config)?;
 
-    crate::config::jobstate::create_state_file("syncjob", &sync_job.id)?;
+    crate::server::jobstate::create_state_file("syncjob", &sync_job.id)?;
 
     Ok(())
 }
@@ -266,7 +266,7 @@ pub fn delete_sync_job(id: String, digest: Option<String>) -> Result<(), Error>
 
     sync::save_config(&config)?;
 
-    crate::config::jobstate::remove_state_file("syncjob", &id)?;
+    crate::server::jobstate::remove_state_file("syncjob", &id)?;
 
     Ok(())
 }
index efc33a5c918f9f8a75f9bab3285feb8967c81b72..32d83d1759aa0b90f391b8f4e829a0f18fe2e03e 100644 (file)
@@ -80,7 +80,7 @@ pub fn create_verification_job(param: Value) -> Result<(), Error> {
 
     verify::save_config(&config)?;
 
-    crate::config::jobstate::create_state_file("verificationjob", &verification_job.id)?;
+    crate::server::jobstate::create_state_file("verificationjob", &verification_job.id)?;
 
     Ok(())
 }
@@ -258,7 +258,7 @@ pub fn delete_verification_job(id: String, digest: Option<String>) -> Result<(),
 
     verify::save_config(&config)?;
 
-    crate::config::jobstate::remove_state_file("verificationjob", &id)?;
+    crate::server::jobstate::remove_state_file("verificationjob", &id)?;
 
     Ok(())
 }
@@ -271,4 +271,4 @@ const ITEM_ROUTER: Router = Router::new()
 pub const ROUTER: Router = Router::new()
     .get(&API_METHOD_LIST_VERIFICATION_JOBS)
     .post(&API_METHOD_CREATE_VERIFICATION_JOB)
-    .match_all("id", &ITEM_ROUTER);
\ No newline at end of file
+    .match_all("id", &ITEM_ROUTER);
index c78658b56bf7291e14384ee681872b197f23e497..441701a573015494c282973f7b5f891335e440ae 100644 (file)
@@ -7,14 +7,13 @@ use futures::{select, future::FutureExt};
 use proxmox::api::api;
 use proxmox::api::{ApiMethod, Router, RpcEnvironment, Permission};
 
-use crate::server::{WorkerTask};
+use crate::server::{WorkerTask, jobstate::Job};
 use crate::backup::DataStore;
 use crate::client::{HttpClient, HttpClientOptions, BackupRepository, pull::pull_store};
 use crate::api2::types::*;
 use crate::config::{
     remote,
     sync::SyncJobConfig,
-    jobstate::Job,
     acl::{PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_PRUNE, PRIV_REMOTE_READ},
     cached_user_info::CachedUserInfo,
 };
index abba38a568c30d475785d8fcf61d4a33f16138f3..2103da40da3304da49ec7524316667fef059b1eb 100644 (file)
@@ -7,10 +7,7 @@ use nix::dir::Dir;
 use anyhow::{bail, format_err, Error};
 
 use crate::{
-    server::WorkerTask,
     api2::types::*,
-    config::jobstate::Job,
-    config::verify::VerificationJobConfig,
     backup::{
         DataStore,
         DataBlob,
@@ -529,96 +526,3 @@ pub fn verify_all_backups(
 
     Ok(errors)
 }
-
-/// Runs a verification job.
-pub fn do_verification_job(
-    mut job: Job,
-    verification_job: VerificationJobConfig,
-    userid: &Userid,
-    schedule: Option<String>,
-) -> Result<String, Error> {
-    let datastore = DataStore::lookup_datastore(&verification_job.store)?;
-
-    let mut backups_to_verify = BackupInfo::list_backups(&datastore.base_path())?;
-    if verification_job.ignore_verified.unwrap_or(true) {
-        backups_to_verify.retain(|backup_info| {
-            let manifest = match datastore.load_manifest(&backup_info.backup_dir) {
-                Ok((manifest, _)) => manifest,
-                Err(_) => return false,
-            };
-
-            let raw_verify_state = manifest.unprotected["verify_state"].clone();
-            let last_state = match serde_json::from_value::<SnapshotVerifyState>(raw_verify_state) {
-                Ok(last_state) => last_state,
-                Err(_) => return true,
-            };
-
-            let now = proxmox::tools::time::epoch_i64();
-            let days_since_last_verify = (now - last_state.upid.starttime) / 86400;
-            verification_job.outdated_after.is_some()
-                && days_since_last_verify > verification_job.outdated_after.unwrap()
-        })
-    }
-
-    let job_id = job.jobname().to_string();
-    let worker_type = job.jobtype().to_string();
-    let upid_str = WorkerTask::new_thread(
-        &worker_type,
-        Some(job.jobname().to_string()),
-        userid.clone(),
-        false,
-        move |worker| {
-            job.start(&worker.upid().to_string())?;
-
-            task_log!(worker,"Starting datastore verify job '{}'", job_id);
-            task_log!(worker,"verifying {} backups", backups_to_verify.len());
-            if let Some(event_str) = schedule {
-                task_log!(worker,"task triggered by schedule '{}'", event_str);
-            }
-
-            let verified_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 16)));
-            let corrupt_chunks = Arc::new(Mutex::new(HashSet::with_capacity(64)));
-            let result = proxmox::try_block!({
-                let mut failed_dirs: Vec<String> = Vec::new();
-
-                for backup_info in backups_to_verify {
-                    let verification_result = verify_backup_dir(
-                        datastore.clone(),
-                        &backup_info.backup_dir,
-                        verified_chunks.clone(),
-                        corrupt_chunks.clone(),
-                        worker.clone(),
-                        worker.upid().clone()
-                    );
-
-                    if let Ok(false) = verification_result {
-                        failed_dirs.push(backup_info.backup_dir.to_string());
-                    } // otherwise successful or aborted
-                }
-
-                if !failed_dirs.is_empty() {
-                    task_log!(worker,"Failed to verify following snapshots:",);
-                    for dir in failed_dirs {
-                        task_log!(worker, "\t{}", dir)
-                    }
-                    bail!("verification failed - please check the log for details");
-                }
-                Ok(())
-            });
-
-            let status = worker.create_state(&result);
-
-            match job.finish(status) {
-                Err(err) => eprintln!(
-                    "could not finish job state for {}: {}",
-                    job.jobtype().to_string(),
-                    err
-                ),
-                Ok(_) => (),
-            }
-
-            result
-        },
-    )?;
-    Ok(upid_str)
-}
index 8f9a4c82f65965873112ebc8cad50705896984fb..c1dee2b8a7c731122fc17946d0602d1d92949646 100644 (file)
@@ -37,7 +37,7 @@ async fn run() -> Result<(), Error> {
     config::update_self_signed_cert(false)?;
 
     proxmox_backup::rrd::create_rrdb_dir()?;
-    proxmox_backup::config::jobstate::create_jobstate_dir()?;
+    proxmox_backup::server::jobstate::create_jobstate_dir()?;
 
     if let Err(err) = generate_auth_key() {
         bail!("unable to generate auth key - {}", err);
index 69ceb1cb7fbdc600f928be1863a3ffce22b4a102..ce2901713fad8c5841890c3d012b80c05c2ad4e2 100644 (file)
@@ -10,11 +10,30 @@ use openssl::ssl::{SslMethod, SslAcceptor, SslFiletype};
 use proxmox::try_block;
 use proxmox::api::RpcEnvironmentType;
 
+use proxmox_backup::{
+    backup::DataStore,
+    server::{
+        UPID,
+        WorkerTask,
+        ApiConfig,
+        rest::*,
+        jobstate::{
+            self,
+            Job,
+        },
+        rotate_task_log_archive,
+    },
+    tools::systemd::time::{
+        parse_calendar_event,
+        compute_next_event,
+    },
+};
+
+
 use proxmox_backup::api2::types::Userid;
 use proxmox_backup::configdir;
 use proxmox_backup::buildcfg;
 use proxmox_backup::server;
-use proxmox_backup::server::{ApiConfig, rest::*};
 use proxmox_backup::auth_helpers::*;
 use proxmox_backup::tools::{
     daemon,
@@ -29,7 +48,7 @@ use proxmox_backup::tools::{
 };
 
 use proxmox_backup::api2::pull::do_sync_job;
-use proxmox_backup::backup::do_verification_job;
+use proxmox_backup::server::do_verification_job;
 
 fn main() -> Result<(), Error> {
     proxmox_backup::tools::setup_safe_path_env();
@@ -221,14 +240,10 @@ async fn schedule_tasks() -> Result<(), Error> {
 
 async fn schedule_datastore_garbage_collection() {
 
-    use proxmox_backup::backup::DataStore;
-    use proxmox_backup::server::{UPID, WorkerTask};
-    use proxmox_backup::config::{
-        jobstate::{self, Job},
-        datastore::{self, DataStoreConfig}
+    use proxmox_backup::config::datastore::{
+        self,
+        DataStoreConfig,
     };
-    use proxmox_backup::tools::systemd::time::{
-        parse_calendar_event, compute_next_event};
 
     let config = match datastore::config() {
         Err(err) => {
@@ -340,15 +355,17 @@ async fn schedule_datastore_garbage_collection() {
 
 async fn schedule_datastore_prune() {
 
-    use proxmox_backup::backup::{
-        PruneOptions, DataStore, BackupGroup, compute_prune_info};
-    use proxmox_backup::server::{WorkerTask};
-    use proxmox_backup::config::{
-        jobstate::{self, Job},
-        datastore::{self, DataStoreConfig}
+    use proxmox_backup::{
+        backup::{
+            PruneOptions,
+            BackupGroup,
+            compute_prune_info,
+        },
+        config::datastore::{
+            self,
+            DataStoreConfig,
+        },
     };
-    use proxmox_backup::tools::systemd::time::{
-        parse_calendar_event, compute_next_event};
 
     let config = match datastore::config() {
         Err(err) => {
@@ -487,9 +504,9 @@ async fn schedule_datastore_prune() {
 
 async fn schedule_datastore_sync_jobs() {
 
-    use proxmox_backup::{
-        config::{ sync::{self, SyncJobConfig}, jobstate::{self, Job} },
-        tools::systemd::time::{ parse_calendar_event, compute_next_event },
+    use proxmox_backup::config::sync::{
+        self,
+        SyncJobConfig,
     };
 
     let config = match sync::config() {
@@ -559,10 +576,12 @@ async fn schedule_datastore_sync_jobs() {
 }
 
 async fn schedule_datastore_verify_jobs() {
-    use proxmox_backup::{
-        config::{verify::{self, VerificationJobConfig}, jobstate::{self, Job}},
-        tools::systemd::time::{parse_calendar_event, compute_next_event},
+
+    use proxmox_backup::config::verify::{
+        self,
+        VerificationJobConfig,
     };
+
     let config = match verify::config() {
         Err(err) => {
             eprintln!("unable to read verification job config - {}", err);
@@ -619,13 +638,6 @@ async fn schedule_datastore_verify_jobs() {
 }
 
 async fn schedule_task_log_rotate() {
-    use proxmox_backup::{
-        config::jobstate::{self, Job},
-        server::rotate_task_log_archive,
-    };
-    use proxmox_backup::server::WorkerTask;
-    use proxmox_backup::tools::systemd::time::{
-        parse_calendar_event, compute_next_event};
 
     let worker_type = "logrotate";
     let job_id = "task_archive";
index ab7fc81a9b52d36491f0434b2010044b76141b43..65c0577e78f7f0d620348702cfca1cf5d160d252 100644 (file)
@@ -18,7 +18,6 @@ use crate::buildcfg;
 pub mod acl;
 pub mod cached_user_info;
 pub mod datastore;
-pub mod jobstate;
 pub mod network;
 pub mod remote;
 pub mod sync;
diff --git a/src/config/jobstate.rs b/src/config/jobstate.rs
deleted file mode 100644 (file)
index b609916..0000000
+++ /dev/null
@@ -1,262 +0,0 @@
-//! Generic JobState handling
-//!
-//! A 'Job' can have 3 states
-//!  - Created, when a schedule was created but never executed
-//!  - Started, when a job is running right now
-//!  - Finished, when a job was running in the past
-//!
-//! and is identified by 2 values: jobtype and jobname (e.g. 'syncjob' and 'myfirstsyncjob')
-//!
-//! This module Provides 2 helper structs to handle those coniditons
-//! 'Job' which handles locking and writing to a file
-//! 'JobState' which is the actual state
-//!
-//! an example usage would be
-//! ```no_run
-//! # use anyhow::{bail, Error};
-//! # use proxmox_backup::server::TaskState;
-//! # use proxmox_backup::config::jobstate::*;
-//! # fn some_code() -> TaskState { TaskState::OK { endtime: 0 } }
-//! # fn code() -> Result<(), Error> {
-//! // locks the correct file under /var/lib
-//! // or fails if someone else holds the lock
-//! let mut job = match Job::new("jobtype", "jobname") {
-//!     Ok(job) => job,
-//!     Err(err) => bail!("could not lock jobstate"),
-//! };
-//!
-//! // job holds the lock, we can start it
-//! job.start("someupid")?;
-//! // do something
-//! let task_state = some_code();
-//! job.finish(task_state)?;
-//!
-//! // release the lock
-//! drop(job);
-//! # Ok(())
-//! # }
-//!
-//! ```
-use std::fs::File;
-use std::path::{Path, PathBuf};
-use std::time::Duration;
-
-use anyhow::{bail, format_err, Error};
-use proxmox::tools::fs::{
-    create_path, file_read_optional_string, open_file_locked, replace_file, CreateOptions,
-};
-use serde::{Deserialize, Serialize};
-
-use crate::server::{upid_read_status, worker_is_active_local, TaskState, UPID};
-
-#[serde(rename_all = "kebab-case")]
-#[derive(Serialize, Deserialize)]
-/// Represents the State of a specific Job
-pub enum JobState {
-    /// A job was created at 'time', but never started/finished
-    Created { time: i64 },
-    /// The Job was last started in 'upid',
-    Started { upid: String },
-    /// The Job was last started in 'upid', which finished with 'state'
-    Finished { upid: String, state: TaskState },
-}
-
-/// Represents a Job and holds the correct lock
-pub struct Job {
-    jobtype: String,
-    jobname: String,
-    /// The State of the job
-    pub state: JobState,
-    _lock: File,
-}
-
-const JOB_STATE_BASEDIR: &str = "/var/lib/proxmox-backup/jobstates";
-
-/// Create jobstate stat dir with correct permission
-pub fn create_jobstate_dir() -> Result<(), Error> {
-    let backup_user = crate::backup::backup_user()?;
-    let opts = CreateOptions::new()
-        .owner(backup_user.uid)
-        .group(backup_user.gid);
-
-    create_path(JOB_STATE_BASEDIR, None, Some(opts))
-        .map_err(|err: Error| format_err!("unable to create rrdb stat dir - {}", err))?;
-
-    Ok(())
-}
-
-fn get_path(jobtype: &str, jobname: &str) -> PathBuf {
-    let mut path = PathBuf::from(JOB_STATE_BASEDIR);
-    path.push(format!("{}-{}.json", jobtype, jobname));
-    path
-}
-
-fn get_lock<P>(path: P) -> Result<File, Error>
-where
-    P: AsRef<Path>,
-{
-    let mut path = path.as_ref().to_path_buf();
-    path.set_extension("lck");
-    let lock = open_file_locked(&path, Duration::new(10, 0), true)?;
-    let backup_user = crate::backup::backup_user()?;
-    nix::unistd::chown(&path, Some(backup_user.uid), Some(backup_user.gid))?;
-    Ok(lock)
-}
-
-/// Removes the statefile of a job, this is useful if we delete a job
-pub fn remove_state_file(jobtype: &str, jobname: &str) -> Result<(), Error> {
-    let mut path = get_path(jobtype, jobname);
-    let _lock = get_lock(&path)?;
-    std::fs::remove_file(&path).map_err(|err| {
-        format_err!(
-            "cannot remove statefile for {} - {}: {}",
-            jobtype,
-            jobname,
-            err
-        )
-    })?;
-    path.set_extension("lck");
-    // ignore errors
-    let _ = std::fs::remove_file(&path).map_err(|err| {
-        format_err!(
-            "cannot remove lockfile for {} - {}: {}",
-            jobtype,
-            jobname,
-            err
-        )
-    });
-    Ok(())
-}
-
-/// Creates the statefile with the state 'Created'
-/// overwrites if it exists already
-pub fn create_state_file(jobtype: &str, jobname: &str) -> Result<(), Error> {
-    let mut job = Job::new(jobtype, jobname)?;
-    job.write_state()
-}
-
-/// Returns the last run time of a job by reading the statefile
-/// Note that this is not locked
-pub fn last_run_time(jobtype: &str, jobname: &str) -> Result<i64, Error> {
-    match JobState::load(jobtype, jobname)? {
-        JobState::Created { time } => Ok(time),
-        JobState::Started { upid } | JobState::Finished { upid, .. } => {
-            let upid: UPID = upid
-                .parse()
-                .map_err(|err| format_err!("could not parse upid from state: {}", err))?;
-            Ok(upid.starttime)
-        }
-    }
-}
-
-impl JobState {
-    /// Loads and deserializes the jobstate from type and name.
-    /// When the loaded state indicates a started UPID,
-    /// we go and check if it has already stopped, and
-    /// returning the correct state.
-    ///
-    /// This does not update the state in the file.
-    pub fn load(jobtype: &str, jobname: &str) -> Result<Self, Error> {
-        if let Some(state) = file_read_optional_string(get_path(jobtype, jobname))? {
-            match serde_json::from_str(&state)? {
-                JobState::Started { upid } => {
-                    let parsed: UPID = upid
-                        .parse()
-                        .map_err(|err| format_err!("error parsing upid: {}", err))?;
-
-                    if !worker_is_active_local(&parsed) {
-                        let state = upid_read_status(&parsed)
-                            .map_err(|err| format_err!("error reading upid log status: {}", err))?;
-
-                        Ok(JobState::Finished { upid, state })
-                    } else {
-                        Ok(JobState::Started { upid })
-                    }
-                }
-                other => Ok(other),
-            }
-        } else {
-            Ok(JobState::Created {
-                time: proxmox::tools::time::epoch_i64() - 30,
-            })
-        }
-    }
-}
-
-impl Job {
-    /// Creates a new instance of a job with the correct lock held
-    /// (will be hold until the job is dropped again).
-    ///
-    /// This does not load the state from the file, to do that,
-    /// 'load' must be called
-    pub fn new(jobtype: &str, jobname: &str) -> Result<Self, Error> {
-        let path = get_path(jobtype, jobname);
-
-        let _lock = get_lock(&path)?;
-
-        Ok(Self {
-            jobtype: jobtype.to_string(),
-            jobname: jobname.to_string(),
-            state: JobState::Created {
-                time: proxmox::tools::time::epoch_i64(),
-            },
-            _lock,
-        })
-    }
-
-    /// Start the job and update the statefile accordingly
-    /// Fails if the job was already started
-    pub fn start(&mut self, upid: &str) -> Result<(), Error> {
-        match self.state {
-            JobState::Started { .. } => {
-                bail!("cannot start job that is started!");
-            }
-            _ => {}
-        }
-
-        self.state = JobState::Started {
-            upid: upid.to_string(),
-        };
-
-        self.write_state()
-    }
-
-    /// Finish the job and update the statefile accordingly with the given taskstate
-    /// Fails if the job was not yet started
-    pub fn finish(&mut self, state: TaskState) -> Result<(), Error> {
-        let upid = match &self.state {
-            JobState::Created { .. } => bail!("cannot finish when not started"),
-            JobState::Started { upid } => upid,
-            JobState::Finished { upid, .. } => upid,
-        }
-        .to_string();
-
-        self.state = JobState::Finished { upid, state };
-
-        self.write_state()
-    }
-
-    pub fn jobtype(&self) -> &str {
-        &self.jobtype
-    }
-
-    pub fn jobname(&self) -> &str {
-        &self.jobname
-    }
-
-    fn write_state(&mut self) -> Result<(), Error> {
-        let serialized = serde_json::to_string(&self.state)?;
-        let path = get_path(&self.jobtype, &self.jobname);
-
-        let backup_user = crate::backup::backup_user()?;
-        let mode = nix::sys::stat::Mode::from_bits_truncate(0o0644);
-        // set the correct owner/group/permissions while saving file
-        // owner(rw) = backup, group(r)= backup
-        let options = CreateOptions::new()
-            .perm(mode)
-            .owner(backup_user.uid)
-            .group(backup_user.gid);
-
-        replace_file(path, serialized.as_bytes(), options)
-    }
-}
index 2712e929feb1c16ad9bab7e6d576d3ceee4e06cb..dbaec6455ef9bf9d42b831176a73f189a4f22d01 100644 (file)
@@ -30,3 +30,7 @@ pub mod formatter;
 #[macro_use]
 pub mod rest;
 
+pub mod jobstate;
+
+mod verify_job;
+pub use verify_job::*;
diff --git a/src/server/jobstate.rs b/src/server/jobstate.rs
new file mode 100644 (file)
index 0000000..b609916
--- /dev/null
@@ -0,0 +1,262 @@
+//! Generic JobState handling
+//!
+//! A 'Job' can have 3 states
+//!  - Created, when a schedule was created but never executed
+//!  - Started, when a job is running right now
+//!  - Finished, when a job was running in the past
+//!
+//! and is identified by 2 values: jobtype and jobname (e.g. 'syncjob' and 'myfirstsyncjob')
+//!
+//! This module Provides 2 helper structs to handle those coniditons
+//! 'Job' which handles locking and writing to a file
+//! 'JobState' which is the actual state
+//!
+//! an example usage would be
+//! ```no_run
+//! # use anyhow::{bail, Error};
+//! # use proxmox_backup::server::TaskState;
+//! # use proxmox_backup::config::jobstate::*;
+//! # fn some_code() -> TaskState { TaskState::OK { endtime: 0 } }
+//! # fn code() -> Result<(), Error> {
+//! // locks the correct file under /var/lib
+//! // or fails if someone else holds the lock
+//! let mut job = match Job::new("jobtype", "jobname") {
+//!     Ok(job) => job,
+//!     Err(err) => bail!("could not lock jobstate"),
+//! };
+//!
+//! // job holds the lock, we can start it
+//! job.start("someupid")?;
+//! // do something
+//! let task_state = some_code();
+//! job.finish(task_state)?;
+//!
+//! // release the lock
+//! drop(job);
+//! # Ok(())
+//! # }
+//!
+//! ```
+use std::fs::File;
+use std::path::{Path, PathBuf};
+use std::time::Duration;
+
+use anyhow::{bail, format_err, Error};
+use proxmox::tools::fs::{
+    create_path, file_read_optional_string, open_file_locked, replace_file, CreateOptions,
+};
+use serde::{Deserialize, Serialize};
+
+use crate::server::{upid_read_status, worker_is_active_local, TaskState, UPID};
+
+#[serde(rename_all = "kebab-case")]
+#[derive(Serialize, Deserialize)]
+/// Represents the State of a specific Job
+pub enum JobState {
+    /// A job was created at 'time', but never started/finished
+    Created { time: i64 },
+    /// The Job was last started in 'upid',
+    Started { upid: String },
+    /// The Job was last started in 'upid', which finished with 'state'
+    Finished { upid: String, state: TaskState },
+}
+
+/// Represents a Job and holds the correct lock
+pub struct Job {
+    jobtype: String,
+    jobname: String,
+    /// The State of the job
+    pub state: JobState,
+    _lock: File,
+}
+
+const JOB_STATE_BASEDIR: &str = "/var/lib/proxmox-backup/jobstates";
+
+/// Create jobstate stat dir with correct permission
+pub fn create_jobstate_dir() -> Result<(), Error> {
+    let backup_user = crate::backup::backup_user()?;
+    let opts = CreateOptions::new()
+        .owner(backup_user.uid)
+        .group(backup_user.gid);
+
+    create_path(JOB_STATE_BASEDIR, None, Some(opts))
+        .map_err(|err: Error| format_err!("unable to create rrdb stat dir - {}", err))?;
+
+    Ok(())
+}
+
+fn get_path(jobtype: &str, jobname: &str) -> PathBuf {
+    let mut path = PathBuf::from(JOB_STATE_BASEDIR);
+    path.push(format!("{}-{}.json", jobtype, jobname));
+    path
+}
+
+fn get_lock<P>(path: P) -> Result<File, Error>
+where
+    P: AsRef<Path>,
+{
+    let mut path = path.as_ref().to_path_buf();
+    path.set_extension("lck");
+    let lock = open_file_locked(&path, Duration::new(10, 0), true)?;
+    let backup_user = crate::backup::backup_user()?;
+    nix::unistd::chown(&path, Some(backup_user.uid), Some(backup_user.gid))?;
+    Ok(lock)
+}
+
+/// Removes the statefile of a job, this is useful if we delete a job
+pub fn remove_state_file(jobtype: &str, jobname: &str) -> Result<(), Error> {
+    let mut path = get_path(jobtype, jobname);
+    let _lock = get_lock(&path)?;
+    std::fs::remove_file(&path).map_err(|err| {
+        format_err!(
+            "cannot remove statefile for {} - {}: {}",
+            jobtype,
+            jobname,
+            err
+        )
+    })?;
+    path.set_extension("lck");
+    // ignore errors
+    let _ = std::fs::remove_file(&path).map_err(|err| {
+        format_err!(
+            "cannot remove lockfile for {} - {}: {}",
+            jobtype,
+            jobname,
+            err
+        )
+    });
+    Ok(())
+}
+
+/// Creates the statefile with the state 'Created'
+/// overwrites if it exists already
+pub fn create_state_file(jobtype: &str, jobname: &str) -> Result<(), Error> {
+    let mut job = Job::new(jobtype, jobname)?;
+    job.write_state()
+}
+
+/// Returns the last run time of a job by reading the statefile
+/// Note that this is not locked
+pub fn last_run_time(jobtype: &str, jobname: &str) -> Result<i64, Error> {
+    match JobState::load(jobtype, jobname)? {
+        JobState::Created { time } => Ok(time),
+        JobState::Started { upid } | JobState::Finished { upid, .. } => {
+            let upid: UPID = upid
+                .parse()
+                .map_err(|err| format_err!("could not parse upid from state: {}", err))?;
+            Ok(upid.starttime)
+        }
+    }
+}
+
+impl JobState {
+    /// Loads and deserializes the jobstate from type and name.
+    /// When the loaded state indicates a started UPID,
+    /// we go and check if it has already stopped, and
+    /// returning the correct state.
+    ///
+    /// This does not update the state in the file.
+    pub fn load(jobtype: &str, jobname: &str) -> Result<Self, Error> {
+        if let Some(state) = file_read_optional_string(get_path(jobtype, jobname))? {
+            match serde_json::from_str(&state)? {
+                JobState::Started { upid } => {
+                    let parsed: UPID = upid
+                        .parse()
+                        .map_err(|err| format_err!("error parsing upid: {}", err))?;
+
+                    if !worker_is_active_local(&parsed) {
+                        let state = upid_read_status(&parsed)
+                            .map_err(|err| format_err!("error reading upid log status: {}", err))?;
+
+                        Ok(JobState::Finished { upid, state })
+                    } else {
+                        Ok(JobState::Started { upid })
+                    }
+                }
+                other => Ok(other),
+            }
+        } else {
+            Ok(JobState::Created {
+                time: proxmox::tools::time::epoch_i64() - 30,
+            })
+        }
+    }
+}
+
+impl Job {
+    /// Creates a new instance of a job with the correct lock held
+    /// (will be hold until the job is dropped again).
+    ///
+    /// This does not load the state from the file, to do that,
+    /// 'load' must be called
+    pub fn new(jobtype: &str, jobname: &str) -> Result<Self, Error> {
+        let path = get_path(jobtype, jobname);
+
+        let _lock = get_lock(&path)?;
+
+        Ok(Self {
+            jobtype: jobtype.to_string(),
+            jobname: jobname.to_string(),
+            state: JobState::Created {
+                time: proxmox::tools::time::epoch_i64(),
+            },
+            _lock,
+        })
+    }
+
+    /// Start the job and update the statefile accordingly
+    /// Fails if the job was already started
+    pub fn start(&mut self, upid: &str) -> Result<(), Error> {
+        match self.state {
+            JobState::Started { .. } => {
+                bail!("cannot start job that is started!");
+            }
+            _ => {}
+        }
+
+        self.state = JobState::Started {
+            upid: upid.to_string(),
+        };
+
+        self.write_state()
+    }
+
+    /// Finish the job and update the statefile accordingly with the given taskstate
+    /// Fails if the job was not yet started
+    pub fn finish(&mut self, state: TaskState) -> Result<(), Error> {
+        let upid = match &self.state {
+            JobState::Created { .. } => bail!("cannot finish when not started"),
+            JobState::Started { upid } => upid,
+            JobState::Finished { upid, .. } => upid,
+        }
+        .to_string();
+
+        self.state = JobState::Finished { upid, state };
+
+        self.write_state()
+    }
+
+    pub fn jobtype(&self) -> &str {
+        &self.jobtype
+    }
+
+    pub fn jobname(&self) -> &str {
+        &self.jobname
+    }
+
+    fn write_state(&mut self) -> Result<(), Error> {
+        let serialized = serde_json::to_string(&self.state)?;
+        let path = get_path(&self.jobtype, &self.jobname);
+
+        let backup_user = crate::backup::backup_user()?;
+        let mode = nix::sys::stat::Mode::from_bits_truncate(0o0644);
+        // set the correct owner/group/permissions while saving file
+        // owner(rw) = backup, group(r)= backup
+        let options = CreateOptions::new()
+            .perm(mode)
+            .owner(backup_user.uid)
+            .group(backup_user.gid);
+
+        replace_file(path, serialized.as_bytes(), options)
+    }
+}