]> git.proxmox.com Git - proxmox-backup.git/blobdiff - src/bin/proxmox-backup-proxy.rs
www/DataStoreStatus.js: display swap stats
[proxmox-backup.git] / src / bin / proxmox-backup-proxy.rs
index 788c3e9df27b7df2cb2b9896c8af4bd0fb1dd016..748958a751af10010ba0882bca2d7442df03ef18 100644 (file)
@@ -1,24 +1,22 @@
 use std::sync::Arc;
 
-use failure::*;
+use anyhow::{bail, format_err, Error};
 use futures::*;
 use hyper;
 use openssl::ssl::{SslMethod, SslAcceptor, SslFiletype};
 
-use proxmox::tools::try_block;
+use proxmox::try_block;
 use proxmox::api::RpcEnvironmentType;
 
 use proxmox_backup::configdir;
 use proxmox_backup::buildcfg;
 use proxmox_backup::server;
-use proxmox_backup::config;
 use proxmox_backup::tools::daemon;
 use proxmox_backup::server::{ApiConfig, rest::*};
 use proxmox_backup::auth_helpers::*;
 
-#[tokio::main]
-async fn main() {
-    if let Err(err) = run().await {
+fn main() {
+    if let Err(err) = proxmox_backup::tools::runtime::main(run()) {
         eprintln!("Error: {}", err);
         std::process::exit(-1);
     }
@@ -32,13 +30,11 @@ async fn run() -> Result<(), Error> {
         bail!("unable to inititialize syslog - {}", err);
     }
 
-    config::update_self_signed_cert(false)?;
-
     let _ = public_auth_key(); // load with lazy_static
     let _ = csrf_secret(); // load with lazy_static
 
     let mut config = ApiConfig::new(
-        buildcfg::JS_DIR, &proxmox_backup::api2::ROUTER, RpcEnvironmentType::PUBLIC);
+        buildcfg::JS_DIR, &proxmox_backup::api2::ROUTER, RpcEnvironmentType::PUBLIC)?;
 
     // add default dirs which includes jquery and bootstrap
     // my $base = '/usr/share/libpve-http-server-perl';
@@ -111,8 +107,556 @@ async fn run() -> Result<(), Error> {
         bail!("unable to start daemon - {}", err);
     }
 
+    start_task_scheduler();
+    start_stat_generator();
+
     server.await?;
+    log::info!("server shutting down, waiting for active workers to complete");
+    proxmox_backup::server::last_worker_future().await?;
     log::info!("done - exit server");
 
     Ok(())
 }
+
+fn start_stat_generator() {
+    let abort_future = server::shutdown_future();
+    let future = Box::pin(run_stat_generator());
+    let task = futures::future::select(future, abort_future);
+    tokio::spawn(task.map(|_| ()));
+}
+
+fn start_task_scheduler() {
+    let abort_future = server::shutdown_future();
+    let future = Box::pin(run_task_scheduler());
+    let task = futures::future::select(future, abort_future);
+    tokio::spawn(task.map(|_| ()));
+}
+
+use std::time:: {Instant, Duration, SystemTime, UNIX_EPOCH};
+
+fn next_minute() -> Result<Instant, Error> {
+    let epoch_now = SystemTime::now().duration_since(UNIX_EPOCH)?;
+    let epoch_next = Duration::from_secs((epoch_now.as_secs()/60 + 1)*60);
+    Ok(Instant::now() + epoch_next - epoch_now)
+}
+
+async fn run_task_scheduler() {
+
+    let mut count: usize = 0;
+
+    loop {
+        count += 1;
+
+        let delay_target = match next_minute() {  // try to run very minute
+            Ok(d) => d,
+            Err(err) => {
+                eprintln!("task scheduler: compute next minute failed - {}", err);
+                tokio::time::delay_until(tokio::time::Instant::from_std(Instant::now() + Duration::from_secs(60))).await;
+                continue;
+            }
+        };
+
+        if count > 2 { // wait 1..2 minutes before starting
+            match schedule_tasks().catch_unwind().await {
+                Err(panic) => {
+                    match panic.downcast::<&str>() {
+                        Ok(msg) => {
+                            eprintln!("task scheduler panic: {}", msg);
+                        }
+                        Err(_) => {
+                            eprintln!("task scheduler panic - unknown type");
+                        }
+                    }
+                }
+                Ok(Err(err)) => {
+                    eprintln!("task scheduler failed - {:?}", err);
+                }
+                Ok(Ok(_)) => {}
+            }
+        }
+
+        tokio::time::delay_until(tokio::time::Instant::from_std(delay_target)).await;
+    }
+}
+
+async fn schedule_tasks() -> Result<(), Error> {
+
+    schedule_datastore_garbage_collection().await;
+    schedule_datastore_prune().await;
+    schedule_datastore_sync_jobs().await;
+
+    Ok(())
+}
+
+fn lookup_last_worker(worker_type: &str, worker_id: &str) -> Result<Option<server::UPID>, Error> {
+
+    let list = proxmox_backup::server::read_task_list()?;
+
+    let mut last: Option<&server::UPID> = None;
+
+    for entry in list.iter() {
+        if entry.upid.worker_type == worker_type {
+            if let Some(ref id) = entry.upid.worker_id {
+                if id == worker_id {
+                    match last {
+                        Some(ref upid) => {
+                            if upid.starttime < entry.upid.starttime {
+                                last = Some(&entry.upid)
+                            }
+                        }
+                        None => {
+                            last = Some(&entry.upid)
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    Ok(last.cloned())
+}
+
+
+async fn schedule_datastore_garbage_collection() {
+
+    use proxmox_backup::backup::DataStore;
+    use proxmox_backup::server::{UPID, WorkerTask};
+    use proxmox_backup::config::datastore::{self, DataStoreConfig};
+    use proxmox_backup::tools::systemd::time::{
+        parse_calendar_event, compute_next_event};
+
+    let config = match datastore::config() {
+        Err(err) => {
+            eprintln!("unable to read datastore config - {}", err);
+            return;
+        }
+        Ok((config, _digest)) => config,
+    };
+
+    for (store, (_, store_config)) in config.sections {
+        let datastore = match DataStore::lookup_datastore(&store) {
+            Ok(datastore) => datastore,
+            Err(err) => {
+                eprintln!("lookup_datastore failed - {}", err);
+                continue;
+            }
+        };
+
+        let store_config: DataStoreConfig = match serde_json::from_value(store_config) {
+            Ok(c) => c,
+            Err(err) => {
+                eprintln!("datastore config from_value failed - {}", err);
+                continue;
+            }
+        };
+
+        let event_str = match store_config.gc_schedule {
+            Some(event_str) => event_str,
+            None => continue,
+        };
+
+        let event = match parse_calendar_event(&event_str) {
+            Ok(event) => event,
+            Err(err) => {
+                eprintln!("unable to parse schedule '{}' - {}", event_str, err);
+                continue;
+            }
+        };
+
+        if datastore.garbage_collection_running() { continue; }
+
+        let worker_type = "garbage_collection";
+
+        let stat = datastore.last_gc_status();
+        let last = if let Some(upid_str) = stat.upid {
+            match upid_str.parse::<UPID>() {
+                Ok(upid) => upid.starttime,
+                Err(err) => {
+                    eprintln!("unable to parse upid '{}' - {}", upid_str, err);
+                    continue;
+                }
+            }
+        } else {
+            match lookup_last_worker(worker_type, &store) {
+                Ok(Some(upid)) => upid.starttime,
+                Ok(None) => 0,
+                Err(err) => {
+                    eprintln!("lookup_last_job_start failed: {}", err);
+                    continue;
+                }
+            }
+        };
+
+        let next = match compute_next_event(&event, last, false) {
+            Ok(next) => next,
+            Err(err) => {
+                eprintln!("compute_next_event for '{}' failed - {}", event_str, err);
+                continue;
+            }
+        };
+        let now = match SystemTime::now().duration_since(UNIX_EPOCH) {
+            Ok(epoch_now) => epoch_now.as_secs() as i64,
+            Err(err) => {
+                eprintln!("query system time failed - {}", err);
+                continue;
+            }
+        };
+        if next > now  { continue; }
+
+        let store2 = store.clone();
+
+        if let Err(err) = WorkerTask::new_thread(
+            worker_type,
+            Some(store.clone()),
+            "backup@pam",
+            false,
+            move |worker| {
+                worker.log(format!("starting garbage collection on store {}", store));
+                worker.log(format!("task triggered by schedule '{}'", event_str));
+                datastore.garbage_collection(&worker)
+            }
+        ) {
+            eprintln!("unable to start garbage collection on store {} - {}", store2, err);
+        }
+    }
+}
+
+async fn schedule_datastore_prune() {
+
+    use proxmox_backup::backup::{
+        PruneOptions, DataStore, BackupGroup, BackupDir, compute_prune_info};
+    use proxmox_backup::server::{WorkerTask};
+    use proxmox_backup::config::datastore::{self, DataStoreConfig};
+    use proxmox_backup::tools::systemd::time::{
+        parse_calendar_event, compute_next_event};
+
+    let config = match datastore::config() {
+        Err(err) => {
+            eprintln!("unable to read datastore config - {}", err);
+            return;
+        }
+        Ok((config, _digest)) => config,
+    };
+
+    for (store, (_, store_config)) in config.sections {
+        let datastore = match DataStore::lookup_datastore(&store) {
+            Ok(datastore) => datastore,
+            Err(err) => {
+                eprintln!("lookup_datastore '{}' failed - {}", store, err);
+                continue;
+            }
+        };
+
+        let store_config: DataStoreConfig = match serde_json::from_value(store_config) {
+            Ok(c) => c,
+            Err(err) => {
+                eprintln!("datastore '{}' config from_value failed - {}", store, err);
+                continue;
+            }
+        };
+
+        let event_str = match store_config.prune_schedule {
+            Some(event_str) => event_str,
+            None => continue,
+        };
+
+        let prune_options = PruneOptions {
+            keep_last: store_config.keep_last,
+            keep_hourly: store_config.keep_hourly,
+            keep_daily:  store_config.keep_daily,
+            keep_weekly: store_config.keep_weekly,
+            keep_monthly: store_config.keep_monthly,
+            keep_yearly: store_config.keep_yearly,
+        };
+
+        if !prune_options.keeps_something() { // no prune settings - keep all
+            continue;
+        }
+
+        let event = match parse_calendar_event(&event_str) {
+            Ok(event) => event,
+            Err(err) => {
+                eprintln!("unable to parse schedule '{}' - {}", event_str, err);
+                continue;
+            }
+        };
+
+        //fixme: if last_prune_job_stzill_running { continue; }
+
+        let worker_type = "prune";
+
+        let last = match lookup_last_worker(worker_type, &store) {
+            Ok(Some(upid)) => upid.starttime,
+            Ok(None) => 0,
+            Err(err) => {
+                eprintln!("lookup_last_job_start failed: {}", err);
+                continue;
+            }
+        };
+
+        let next = match compute_next_event(&event, last, false) {
+            Ok(next) => next,
+            Err(err) => {
+                eprintln!("compute_next_event for '{}' failed - {}", event_str, err);
+                continue;
+            }
+        };
+
+        let now = match SystemTime::now().duration_since(UNIX_EPOCH) {
+            Ok(epoch_now) => epoch_now.as_secs() as i64,
+            Err(err) => {
+                eprintln!("query system time failed - {}", err);
+                continue;
+            }
+        };
+        if next > now  { continue; }
+
+        let store2 = store.clone();
+
+        if let Err(err) = WorkerTask::new_thread(
+            worker_type,
+            Some(store.clone()),
+            "backup@pam",
+            false,
+            move |worker| {
+                worker.log(format!("Starting datastore prune on store \"{}\"", store));
+                worker.log(format!("task triggered by schedule '{}'", event_str));
+                worker.log(format!("retention options: {}", prune_options.cli_options_string()));
+
+                let base_path = datastore.base_path();
+
+                let groups = BackupGroup::list_groups(&base_path)?;
+                for group in groups {
+                    let list = group.list_backups(&base_path)?;
+                    let mut prune_info = compute_prune_info(list, &prune_options)?;
+                    prune_info.reverse(); // delete older snapshots first
+
+                    worker.log(format!("Starting prune on store \"{}\" group \"{}/{}\"",
+                                       store, group.backup_type(), group.backup_id()));
+
+                    for (info, keep) in prune_info {
+                        worker.log(format!(
+                            "{} {}/{}/{}",
+                            if keep { "keep" } else { "remove" },
+                            group.backup_type(), group.backup_id(),
+                            BackupDir::backup_time_to_string(info.backup_dir.backup_time())));
+
+                        if !keep {
+                            datastore.remove_backup_dir(&info.backup_dir)?;
+                        }
+                    }
+                }
+
+                Ok(())
+            }
+        ) {
+            eprintln!("unable to start datastore prune on store {} - {}", store2, err);
+        }
+    }
+}
+
+async fn schedule_datastore_sync_jobs() {
+
+    use proxmox_backup::{
+        backup::DataStore,
+        client::{ HttpClient, HttpClientOptions, BackupRepository, pull::pull_store },
+        server::{ WorkerTask },
+        config::{ sync::{self, SyncJobConfig}, remote::{self, Remote} },
+        tools::systemd::time::{ parse_calendar_event, compute_next_event },
+    };
+
+    let config = match sync::config() {
+        Err(err) => {
+            eprintln!("unable to read sync job config - {}", err);
+            return;
+        }
+        Ok((config, _digest)) => config,
+    };
+
+    let remote_config = match remote::config() {
+        Err(err) => {
+            eprintln!("unable to read remote config - {}", err);
+            return;
+        }
+        Ok((config, _digest)) => config,
+    };
+
+    for (job_id, (_, job_config)) in config.sections {
+        let job_config: SyncJobConfig = match serde_json::from_value(job_config) {
+            Ok(c) => c,
+            Err(err) => {
+                eprintln!("sync job config from_value failed - {}", err);
+                continue;
+            }
+        };
+
+        let event_str = match job_config.schedule {
+            Some(ref event_str) => event_str.clone(),
+            None => continue,
+        };
+
+        let event = match parse_calendar_event(&event_str) {
+            Ok(event) => event,
+            Err(err) => {
+                eprintln!("unable to parse schedule '{}' - {}", event_str, err);
+                continue;
+            }
+        };
+
+        //fixme: if last_sync_job_still_running { continue; }
+
+        let worker_type = "sync";
+
+        let last = match lookup_last_worker(worker_type, &job_config.store) {
+            Ok(Some(upid)) => upid.starttime,
+            Ok(None) => 0,
+            Err(err) => {
+                eprintln!("lookup_last_job_start failed: {}", err);
+                continue;
+            }
+        };
+
+        let next = match compute_next_event(&event, last, false) {
+            Ok(next) => next,
+            Err(err) => {
+                eprintln!("compute_next_event for '{}' failed - {}", event_str, err);
+                continue;
+            }
+        };
+
+        let now = match SystemTime::now().duration_since(UNIX_EPOCH) {
+            Ok(epoch_now) => epoch_now.as_secs() as i64,
+            Err(err) => {
+                eprintln!("query system time failed - {}", err);
+                continue;
+            }
+        };
+        if next > now  { continue; }
+
+
+        let job_id2 = job_id.clone();
+
+        let tgt_store = match DataStore::lookup_datastore(&job_config.store) {
+            Ok(datastore) => datastore,
+            Err(err) => {
+                eprintln!("lookup_datastore '{}' failed - {}", job_config.store, err);
+                continue;
+            }
+        };
+
+        let remote: Remote = match remote_config.lookup("remote", &job_config.remote) {
+            Ok(remote) => remote,
+            Err(err) => {
+                eprintln!("remote_config lookup failed: {}", err);
+                continue;
+            }
+        };
+
+        let username = String::from("backup@pam");
+
+        let delete = job_config.remove_vanished.unwrap_or(true);
+
+        if let Err(err) = WorkerTask::spawn(
+            worker_type,
+            Some(job_config.store.clone()),
+            &username.clone(),
+            false,
+            move |worker| async move {
+                worker.log(format!("Starting datastore sync job '{}'", job_id));
+                worker.log(format!("task triggered by schedule '{}'", event_str));
+                worker.log(format!("Sync datastore '{}' from '{}/{}'",
+                                   job_config.store, job_config.remote, job_config.remote_store));
+
+                let options = HttpClientOptions::new()
+                    .password(Some(remote.password.clone()))
+                    .fingerprint(remote.fingerprint.clone());
+
+                let client = HttpClient::new(&remote.host, &remote.userid, options)?;
+                let _auth_info = client.login() // make sure we can auth
+                    .await
+                    .map_err(|err| format_err!("remote connection to '{}' failed - {}", remote.host, err))?;
+
+                let src_repo = BackupRepository::new(Some(remote.userid), Some(remote.host), job_config.remote_store);
+
+                pull_store(&worker, &client, &src_repo, tgt_store, delete, username).await?;
+
+                Ok(())
+            }
+        ) {
+            eprintln!("unable to start datastore sync job {} - {}", job_id2, err);
+        }
+    }
+}
+
+async fn run_stat_generator() {
+
+    loop {
+        let delay_target = Instant::now() +  Duration::from_secs(10);
+
+        generate_host_stats().await;
+
+        tokio::time::delay_until(tokio::time::Instant::from_std(delay_target)).await;
+    }
+
+}
+
+async fn generate_host_stats() {
+    use proxmox::sys::linux::procfs::{
+        read_meminfo, read_proc_stat, read_proc_net_dev};
+    use proxmox_backup::rrd;
+
+    proxmox_backup::tools::runtime::block_in_place(move || {
+
+        match read_proc_stat() {
+            Ok(stat) => {
+                if let Err(err) = rrd::update_value("host/cpu", stat.cpu, rrd::DST::Gauge) {
+                    eprintln!("rrd::update_value 'host/cpu' failed - {}", err);
+                }
+            }
+            Err(err) => {
+                eprintln!("read_proc_stat failed - {}", err);
+            }
+        }
+        match read_meminfo() {
+            Ok(meminfo) => {
+                if let Err(err) = rrd::update_value("host/memtotal", meminfo.memtotal as f64, rrd::DST::Gauge) {
+                    eprintln!("rrd::update_value 'host/memtotal' failed - {}", err);
+                }
+                if let Err(err) = rrd::update_value("host/memused", meminfo.memused as f64, rrd::DST::Gauge) {
+                    eprintln!("rrd::update_value 'host/memused' failed - {}", err);
+                }
+                if let Err(err) = rrd::update_value("host/swaptotal", meminfo.swaptotal as f64, rrd::DST::Gauge) {
+                    eprintln!("rrd::update_value 'host/swaptotal' failed - {}", err);
+                }
+                if let Err(err) = rrd::update_value("host/swapused", meminfo.swapused as f64, rrd::DST::Gauge) {
+                    eprintln!("rrd::update_value 'host/swapused' failed - {}", err);
+                }
+            }
+            Err(err) => {
+                eprintln!("read_meminfo failed - {}", err);
+            }
+        }
+
+        match read_proc_net_dev() {
+            Ok(netdev) => {
+                use proxmox_backup::config::network::is_physical_nic;
+                let mut netin = 0;
+                let mut netout = 0;
+                for item in netdev {
+                    if !is_physical_nic(&item.device) { continue; }
+                    netin += item.receive;
+                    netout += item.send;
+                }
+                if let Err(err) = rrd::update_value("host/netin", netin as f64, rrd::DST::Derive) {
+                    eprintln!("rrd::update_value 'host/netin' failed - {}", err);
+                }
+                if let Err(err) = rrd::update_value("host/netout", netout as f64, rrd::DST::Derive) {
+                    eprintln!("rrd::update_value 'host/netout' failed - {}", err);
+                }
+            }
+            Err(err) => {
+                eprintln!("read_prox_net_dev failed - {}", err);
+            }
+        }
+    });
+}