]> git.proxmox.com Git - proxmox-backup.git/blobdiff - src/bin/proxmox-backup-proxy.rs
RRD_CACHE: use a OnceCell instead of lazy_static
[proxmox-backup.git] / src / bin / proxmox-backup-proxy.rs
index 9199ebaed22897374d1c0ccb052e6a87ae8e8a75..66d81bdbf462f4219a89dde742b0e844f262f524 100644 (file)
@@ -17,22 +17,22 @@ use tokio_stream::wrappers::ReceiverStream;
 use serde_json::{json, Value};
 use http::{Method, HeaderMap};
 
-use proxmox::try_block;
-use proxmox::api::{RpcEnvironment, RpcEnvironmentType, UserInformation};
 use proxmox::sys::linux::socket::set_tcp_keepalive;
 use proxmox::tools::fs::CreateOptions;
+use proxmox_lang::try_block;
+use proxmox_router::{RpcEnvironment, RpcEnvironmentType, UserInformation};
 
-use pbs_tools::task_log;
+use pbs_tools::{task_log, task_warn};
 use pbs_datastore::DataStore;
 use proxmox_rrd::DST;
 
 use proxmox_rest_server::{
     rotate_task_log_archive, extract_cookie , AuthError, ApiConfig, RestServer, RestEnvironment,
-    ServerAdapter, WorkerTask,
+    ServerAdapter, WorkerTask, cleanup_old_tasks,
 };
 
 use proxmox_backup::{
-    RRD_CACHE,
+    get_rrd_cache, initialize_rrd_cache,
     server::{
         auth::check_pbs_auth,
         jobstate::{
@@ -208,6 +208,9 @@ async fn run() -> Result<(), Error> {
     let _ = public_auth_key(); // load with lazy_static
     let _ = csrf_secret(); // load with lazy_static
 
+    let rrd_cache = initialize_rrd_cache()?;
+    rrd_cache.apply_journal()?;
+
     let mut config = ApiConfig::new(
         pbs_buildcfg::JS_DIR,
         &proxmox_backup::api2::ROUTER,
@@ -577,7 +580,7 @@ async fn schedule_datastore_garbage_collection() {
             }
         };
 
-        let now = proxmox::tools::time::epoch_i64();
+        let now = proxmox_time::epoch_i64();
 
         if next > now  { continue; }
 
@@ -827,6 +830,13 @@ async fn schedule_task_log_rotate() {
                     task_log!(worker, "API authentication log was not rotated");
                 }
 
+                if has_rotated {
+                    task_log!(worker, "cleaning up old task logs");
+                    if let Err(err) = cleanup_old_tasks(true) {
+                        task_warn!(worker, "could not completely cleanup old tasks: {}", err);
+                    }
+                }
+
                 Ok(())
             });
 
@@ -848,11 +858,11 @@ async fn command_reopen_access_logfiles() -> Result<(), Error> {
     // only care about the most recent daemon instance for each, proxy & api, as other older ones
     // should not respond to new requests anyway, but only finish their current one and then exit.
     let sock = proxmox_rest_server::our_ctrl_sock();
-    let f1 = proxmox_rest_server::send_command(sock, "{\"command\":\"api-access-log-reopen\"}\n");
+    let f1 = proxmox_rest_server::send_raw_command(sock, "{\"command\":\"api-access-log-reopen\"}\n");
 
     let pid = proxmox_rest_server::read_pid(pbs_buildcfg::PROXMOX_BACKUP_API_PID_FN)?;
     let sock = proxmox_rest_server::ctrl_sock_from_pid(pid);
-    let f2 = proxmox_rest_server::send_command(sock, "{\"command\":\"api-access-log-reopen\"}\n");
+    let f2 = proxmox_rest_server::send_raw_command(sock, "{\"command\":\"api-access-log-reopen\"}\n");
 
     match futures::join!(f1, f2) {
         (Err(e1), Err(e2)) => Err(format_err!("reopen commands failed, proxy: {}; api: {}", e1, e2)),
@@ -866,11 +876,11 @@ async fn command_reopen_auth_logfiles() -> Result<(), Error> {
     // only care about the most recent daemon instance for each, proxy & api, as other older ones
     // should not respond to new requests anyway, but only finish their current one and then exit.
     let sock = proxmox_rest_server::our_ctrl_sock();
-    let f1 = proxmox_rest_server::send_command(sock, "{\"command\":\"api-auth-log-reopen\"}\n");
+    let f1 = proxmox_rest_server::send_raw_command(sock, "{\"command\":\"api-auth-log-reopen\"}\n");
 
     let pid = proxmox_rest_server::read_pid(pbs_buildcfg::PROXMOX_BACKUP_API_PID_FN)?;
     let sock = proxmox_rest_server::ctrl_sock_from_pid(pid);
-    let f2 = proxmox_rest_server::send_command(sock, "{\"command\":\"api-auth-log-reopen\"}\n");
+    let f2 = proxmox_rest_server::send_raw_command(sock, "{\"command\":\"api-auth-log-reopen\"}\n");
 
     match futures::join!(f1, f2) {
         (Err(e1), Err(e2)) => Err(format_err!("reopen commands failed, proxy: {}; api: {}", e1, e2)),
@@ -882,14 +892,10 @@ async fn command_reopen_auth_logfiles() -> Result<(), Error> {
 
 async fn run_stat_generator() {
 
-    let mut count = 0;
     loop {
-        count += 1;
-        let save = if count >= 6 { count = 0; true } else { false };
-
         let delay_target = Instant::now() +  Duration::from_secs(10);
 
-        generate_host_stats(save).await;
+        generate_host_stats().await;
 
         tokio::time::sleep_until(tokio::time::Instant::from_std(delay_target)).await;
 
@@ -897,19 +903,23 @@ async fn run_stat_generator() {
 
 }
 
-fn rrd_update_gauge(name: &str, value: f64, save: bool) {
-    if let Err(err) = RRD_CACHE.update_value(name, value, DST::Gauge, save) {
-        eprintln!("rrd::update_value '{}' failed - {}", name, err);
+fn rrd_update_gauge(name: &str, value: f64) {
+    if let Ok(rrd_cache) = get_rrd_cache() {
+        if let Err(err) = rrd_cache.update_value(name, value, DST::Gauge) {
+            eprintln!("rrd::update_value '{}' failed - {}", name, err);
+        }
     }
 }
 
-fn rrd_update_derive(name: &str, value: f64, save: bool) {
-    if let Err(err) = RRD_CACHE.update_value(name, value, DST::Derive, save) {
-        eprintln!("rrd::update_value '{}' failed - {}", name, err);
+fn rrd_update_derive(name: &str, value: f64) {
+    if let Ok(rrd_cache) = get_rrd_cache() {
+        if let Err(err) = rrd_cache.update_value(name, value, DST::Derive) {
+            eprintln!("rrd::update_value '{}' failed - {}", name, err);
+        }
     }
 }
 
-async fn generate_host_stats(save: bool) {
+async fn generate_host_stats() {
     use proxmox::sys::linux::procfs::{
         read_meminfo, read_proc_stat, read_proc_net_dev, read_loadavg};
 
@@ -917,8 +927,8 @@ async fn generate_host_stats(save: bool) {
 
         match read_proc_stat() {
             Ok(stat) => {
-                rrd_update_gauge("host/cpu", stat.cpu, save);
-                rrd_update_gauge("host/iowait", stat.iowait_percent, save);
+                rrd_update_gauge("host/cpu", stat.cpu);
+                rrd_update_gauge("host/iowait", stat.iowait_percent);
             }
             Err(err) => {
                 eprintln!("read_proc_stat failed - {}", err);
@@ -927,10 +937,10 @@ async fn generate_host_stats(save: bool) {
 
         match read_meminfo() {
             Ok(meminfo) => {
-                rrd_update_gauge("host/memtotal", meminfo.memtotal as f64, save);
-                rrd_update_gauge("host/memused", meminfo.memused as f64, save);
-                rrd_update_gauge("host/swaptotal", meminfo.swaptotal as f64, save);
-                rrd_update_gauge("host/swapused", meminfo.swapused as f64, save);
+                rrd_update_gauge("host/memtotal", meminfo.memtotal as f64);
+                rrd_update_gauge("host/memused", meminfo.memused as f64);
+                rrd_update_gauge("host/swaptotal", meminfo.swaptotal as f64);
+                rrd_update_gauge("host/swapused", meminfo.swapused as f64);
             }
             Err(err) => {
                 eprintln!("read_meminfo failed - {}", err);
@@ -947,8 +957,8 @@ async fn generate_host_stats(save: bool) {
                     netin += item.receive;
                     netout += item.send;
                 }
-                rrd_update_derive("host/netin", netin as f64, save);
-                rrd_update_derive("host/netout", netout as f64, save);
+                rrd_update_derive("host/netin", netin as f64);
+                rrd_update_derive("host/netout", netout as f64);
             }
             Err(err) => {
                 eprintln!("read_prox_net_dev failed - {}", err);
@@ -957,7 +967,7 @@ async fn generate_host_stats(save: bool) {
 
         match read_loadavg() {
             Ok(loadavg) => {
-                rrd_update_gauge("host/loadavg", loadavg.0 as f64, save);
+                rrd_update_gauge("host/loadavg", loadavg.0 as f64);
             }
             Err(err) => {
                 eprintln!("read_loadavg failed - {}", err);
@@ -966,7 +976,7 @@ async fn generate_host_stats(save: bool) {
 
         let disk_manager = DiskManage::new();
 
-        gather_disk_stats(disk_manager.clone(), Path::new("/"), "host", save);
+        gather_disk_stats(disk_manager.clone(), Path::new("/"), "host");
 
         match pbs_config::datastore::config() {
             Ok((config, _)) => {
@@ -977,7 +987,7 @@ async fn generate_host_stats(save: bool) {
 
                     let rrd_prefix = format!("datastore/{}", config.name);
                     let path = std::path::Path::new(&config.path);
-                    gather_disk_stats(disk_manager.clone(), path, &rrd_prefix, save);
+                    gather_disk_stats(disk_manager.clone(), path, &rrd_prefix);
                 }
             }
             Err(err) => {
@@ -1014,18 +1024,18 @@ fn check_schedule(worker_type: &str, event_str: &str, id: &str) -> bool {
         }
     };
 
-    let now = proxmox::tools::time::epoch_i64();
+    let now = proxmox_time::epoch_i64();
     next <= now
 }
 
-fn gather_disk_stats(disk_manager: Arc<DiskManage>, path: &Path, rrd_prefix: &str, save: bool) {
+fn gather_disk_stats(disk_manager: Arc<DiskManage>, path: &Path, rrd_prefix: &str) {
 
     match proxmox_backup::tools::disks::disk_usage(path) {
         Ok(status) => {
             let rrd_key = format!("{}/total", rrd_prefix);
-            rrd_update_gauge(&rrd_key, status.total as f64, save);
+            rrd_update_gauge(&rrd_key, status.total as f64);
             let rrd_key = format!("{}/used", rrd_prefix);
-            rrd_update_gauge(&rrd_key, status.used as f64, save);
+            rrd_update_gauge(&rrd_key, status.used as f64);
         }
         Err(err) => {
             eprintln!("read disk_usage on {:?} failed - {}", path, err);
@@ -1057,17 +1067,17 @@ fn gather_disk_stats(disk_manager: Arc<DiskManage>, path: &Path, rrd_prefix: &st
             }
             if let Some(stat) = device_stat {
                 let rrd_key = format!("{}/read_ios", rrd_prefix);
-                rrd_update_derive(&rrd_key, stat.read_ios as f64, save);
+                rrd_update_derive(&rrd_key, stat.read_ios as f64);
                 let rrd_key = format!("{}/read_bytes", rrd_prefix);
-                rrd_update_derive(&rrd_key, (stat.read_sectors*512) as f64, save);
+                rrd_update_derive(&rrd_key, (stat.read_sectors*512) as f64);
 
                 let rrd_key = format!("{}/write_ios", rrd_prefix);
-                rrd_update_derive(&rrd_key, stat.write_ios as f64, save);
+                rrd_update_derive(&rrd_key, stat.write_ios as f64);
                 let rrd_key = format!("{}/write_bytes", rrd_prefix);
-                rrd_update_derive(&rrd_key, (stat.write_sectors*512) as f64, save);
+                rrd_update_derive(&rrd_key, (stat.write_sectors*512) as f64);
 
                 let rrd_key = format!("{}/io_ticks", rrd_prefix);
-                rrd_update_derive(&rrd_key, (stat.io_ticks as f64)/1000.0, save);
+                rrd_update_derive(&rrd_key, (stat.io_ticks as f64)/1000.0);
             }
         }
         Err(err) => {