]> git.proxmox.com Git - proxmox-backup.git/blobdiff - src/bin/proxmox-backup-proxy.rs
www/DataStoreStatus.js: display swap stats
[proxmox-backup.git] / src / bin / proxmox-backup-proxy.rs
index 2e56a435a978af25465f6839de6f5270c7a42abe..748958a751af10010ba0882bca2d7442df03ef18 100644 (file)
@@ -108,6 +108,7 @@ async fn run() -> Result<(), Error> {
     }
 
     start_task_scheduler();
+    start_stat_generator();
 
     server.await?;
     log::info!("server shutting down, waiting for active workers to complete");
@@ -117,6 +118,13 @@ async fn run() -> Result<(), Error> {
     Ok(())
 }
 
+fn start_stat_generator() {
+    let abort_future = server::shutdown_future();
+    let future = Box::pin(run_stat_generator());
+    let task = futures::future::select(future, abort_future);
+    tokio::spawn(task.map(|_| ()));
+}
+
 fn start_task_scheduler() {
     let abort_future = server::shutdown_future();
     let future = Box::pin(run_task_scheduler());
@@ -175,6 +183,7 @@ async fn schedule_tasks() -> Result<(), Error> {
 
     schedule_datastore_garbage_collection().await;
     schedule_datastore_prune().await;
+    schedule_datastore_sync_jobs().await;
 
     Ok(())
 }
@@ -299,7 +308,7 @@ async fn schedule_datastore_garbage_collection() {
         if let Err(err) = WorkerTask::new_thread(
             worker_type,
             Some(store.clone()),
-            "root@pam",
+            "backup@pam",
             false,
             move |worker| {
                 worker.log(format!("starting garbage collection on store {}", store));
@@ -333,7 +342,7 @@ async fn schedule_datastore_prune() {
         let datastore = match DataStore::lookup_datastore(&store) {
             Ok(datastore) => datastore,
             Err(err) => {
-                eprintln!("lookup_datastore failed - {}", err);
+                eprintln!("lookup_datastore '{}' failed - {}", store, err);
                 continue;
             }
         };
@@ -341,7 +350,7 @@ async fn schedule_datastore_prune() {
         let store_config: DataStoreConfig = match serde_json::from_value(store_config) {
             Ok(c) => c,
             Err(err) => {
-                eprintln!("datastore config from_value failed - {}", err);
+                eprintln!("datastore '{}' config from_value failed - {}", store, err);
                 continue;
             }
         };
@@ -407,10 +416,11 @@ async fn schedule_datastore_prune() {
         if let Err(err) = WorkerTask::new_thread(
             worker_type,
             Some(store.clone()),
-            "root@pam",
+            "backup@pam",
             false,
             move |worker| {
                 worker.log(format!("Starting datastore prune on store \"{}\"", store));
+                worker.log(format!("task triggered by schedule '{}'", event_str));
                 worker.log(format!("retention options: {}", prune_options.cli_options_string()));
 
                 let base_path = datastore.base_path();
@@ -444,3 +454,209 @@ async fn schedule_datastore_prune() {
         }
     }
 }
+
+async fn schedule_datastore_sync_jobs() {
+
+    use proxmox_backup::{
+        backup::DataStore,
+        client::{ HttpClient, HttpClientOptions, BackupRepository, pull::pull_store },
+        server::{ WorkerTask },
+        config::{ sync::{self, SyncJobConfig}, remote::{self, Remote} },
+        tools::systemd::time::{ parse_calendar_event, compute_next_event },
+    };
+
+    let config = match sync::config() {
+        Err(err) => {
+            eprintln!("unable to read sync job config - {}", err);
+            return;
+        }
+        Ok((config, _digest)) => config,
+    };
+
+    let remote_config = match remote::config() {
+        Err(err) => {
+            eprintln!("unable to read remote config - {}", err);
+            return;
+        }
+        Ok((config, _digest)) => config,
+    };
+
+    for (job_id, (_, job_config)) in config.sections {
+        let job_config: SyncJobConfig = match serde_json::from_value(job_config) {
+            Ok(c) => c,
+            Err(err) => {
+                eprintln!("sync job config from_value failed - {}", err);
+                continue;
+            }
+        };
+
+        let event_str = match job_config.schedule {
+            Some(ref event_str) => event_str.clone(),
+            None => continue,
+        };
+
+        let event = match parse_calendar_event(&event_str) {
+            Ok(event) => event,
+            Err(err) => {
+                eprintln!("unable to parse schedule '{}' - {}", event_str, err);
+                continue;
+            }
+        };
+
+        //fixme: if last_sync_job_still_running { continue; }
+
+        let worker_type = "sync";
+
+        let last = match lookup_last_worker(worker_type, &job_config.store) {
+            Ok(Some(upid)) => upid.starttime,
+            Ok(None) => 0,
+            Err(err) => {
+                eprintln!("lookup_last_job_start failed: {}", err);
+                continue;
+            }
+        };
+
+        let next = match compute_next_event(&event, last, false) {
+            Ok(next) => next,
+            Err(err) => {
+                eprintln!("compute_next_event for '{}' failed - {}", event_str, err);
+                continue;
+            }
+        };
+
+        let now = match SystemTime::now().duration_since(UNIX_EPOCH) {
+            Ok(epoch_now) => epoch_now.as_secs() as i64,
+            Err(err) => {
+                eprintln!("query system time failed - {}", err);
+                continue;
+            }
+        };
+        if next > now  { continue; }
+
+
+        let job_id2 = job_id.clone();
+
+        let tgt_store = match DataStore::lookup_datastore(&job_config.store) {
+            Ok(datastore) => datastore,
+            Err(err) => {
+                eprintln!("lookup_datastore '{}' failed - {}", job_config.store, err);
+                continue;
+            }
+        };
+
+        let remote: Remote = match remote_config.lookup("remote", &job_config.remote) {
+            Ok(remote) => remote,
+            Err(err) => {
+                eprintln!("remote_config lookup failed: {}", err);
+                continue;
+            }
+        };
+
+        let username = String::from("backup@pam");
+
+        let delete = job_config.remove_vanished.unwrap_or(true);
+
+        if let Err(err) = WorkerTask::spawn(
+            worker_type,
+            Some(job_config.store.clone()),
+            &username.clone(),
+            false,
+            move |worker| async move {
+                worker.log(format!("Starting datastore sync job '{}'", job_id));
+                worker.log(format!("task triggered by schedule '{}'", event_str));
+                worker.log(format!("Sync datastore '{}' from '{}/{}'",
+                                   job_config.store, job_config.remote, job_config.remote_store));
+
+                let options = HttpClientOptions::new()
+                    .password(Some(remote.password.clone()))
+                    .fingerprint(remote.fingerprint.clone());
+
+                let client = HttpClient::new(&remote.host, &remote.userid, options)?;
+                let _auth_info = client.login() // make sure we can auth
+                    .await
+                    .map_err(|err| format_err!("remote connection to '{}' failed - {}", remote.host, err))?;
+
+                let src_repo = BackupRepository::new(Some(remote.userid), Some(remote.host), job_config.remote_store);
+
+                pull_store(&worker, &client, &src_repo, tgt_store, delete, username).await?;
+
+                Ok(())
+            }
+        ) {
+            eprintln!("unable to start datastore sync job {} - {}", job_id2, err);
+        }
+    }
+}
+
+async fn run_stat_generator() {
+
+    loop {
+        let delay_target = Instant::now() +  Duration::from_secs(10);
+
+        generate_host_stats().await;
+
+        tokio::time::delay_until(tokio::time::Instant::from_std(delay_target)).await;
+    }
+
+}
+
+async fn generate_host_stats() {
+    use proxmox::sys::linux::procfs::{
+        read_meminfo, read_proc_stat, read_proc_net_dev};
+    use proxmox_backup::rrd;
+
+    proxmox_backup::tools::runtime::block_in_place(move || {
+
+        match read_proc_stat() {
+            Ok(stat) => {
+                if let Err(err) = rrd::update_value("host/cpu", stat.cpu, rrd::DST::Gauge) {
+                    eprintln!("rrd::update_value 'host/cpu' failed - {}", err);
+                }
+            }
+            Err(err) => {
+                eprintln!("read_proc_stat failed - {}", err);
+            }
+        }
+        match read_meminfo() {
+            Ok(meminfo) => {
+                if let Err(err) = rrd::update_value("host/memtotal", meminfo.memtotal as f64, rrd::DST::Gauge) {
+                    eprintln!("rrd::update_value 'host/memtotal' failed - {}", err);
+                }
+                if let Err(err) = rrd::update_value("host/memused", meminfo.memused as f64, rrd::DST::Gauge) {
+                    eprintln!("rrd::update_value 'host/memused' failed - {}", err);
+                }
+                if let Err(err) = rrd::update_value("host/swaptotal", meminfo.swaptotal as f64, rrd::DST::Gauge) {
+                    eprintln!("rrd::update_value 'host/swaptotal' failed - {}", err);
+                }
+                if let Err(err) = rrd::update_value("host/swapused", meminfo.swapused as f64, rrd::DST::Gauge) {
+                    eprintln!("rrd::update_value 'host/swapused' failed - {}", err);
+                }
+            }
+            Err(err) => {
+                eprintln!("read_meminfo failed - {}", err);
+            }
+        }
+
+        match read_proc_net_dev() {
+            Ok(netdev) => {
+                use proxmox_backup::config::network::is_physical_nic;
+                let mut netin = 0;
+                let mut netout = 0;
+                for item in netdev {
+                    if !is_physical_nic(&item.device) { continue; }
+                    netin += item.receive;
+                    netout += item.send;
+                }
+                if let Err(err) = rrd::update_value("host/netin", netin as f64, rrd::DST::Derive) {
+                    eprintln!("rrd::update_value 'host/netin' failed - {}", err);
+                }
+                if let Err(err) = rrd::update_value("host/netout", netout as f64, rrd::DST::Derive) {
+                    eprintln!("rrd::update_value 'host/netout' failed - {}", err);
+                }
+            }
+            Err(err) => {
+                eprintln!("read_prox_net_dev failed - {}", err);
+            }
+        }
+    });
+}