X-Git-Url: https://git.proxmox.com/?p=proxmox-backup.git;a=blobdiff_plain;f=src%2Fbin%2Fproxmox-backup-proxy.rs;h=fc77345941e9bccc05adf66f6cd8629ae61c19af;hp=7af0229ffeab8842f41ba60846d9729751446da7;hb=45b8a0327f21f048bb4384bafc292954358b5651;hpb=8c03041a2cc5b67817a0727a93e3b69acd192a68 diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs index 7af0229f..fc773459 100644 --- a/src/bin/proxmox-backup-proxy.rs +++ b/src/bin/proxmox-backup-proxy.rs @@ -1,25 +1,75 @@ use std::sync::Arc; +use std::path::{Path, PathBuf}; +use std::pin::Pin; +use std::os::unix::io::AsRawFd; use anyhow::{bail, format_err, Error}; use futures::*; -use hyper; + use openssl::ssl::{SslMethod, SslAcceptor, SslFiletype}; +use tokio_stream::wrappers::ReceiverStream; +use serde_json::Value; use proxmox::try_block; use proxmox::api::RpcEnvironmentType; +use proxmox_backup::{ + backup::DataStore, + server::{ + auth::default_api_auth, + WorkerTask, + ApiConfig, + rest::*, + jobstate::{ + self, + Job, + }, + rotate_task_log_archive, + }, + tools::systemd::time::{ + parse_calendar_event, + compute_next_event, + }, +}; + + +use proxmox_backup::api2::types::Authid; use proxmox_backup::configdir; use proxmox_backup::buildcfg; use proxmox_backup::server; -use proxmox_backup::tools::daemon; -use proxmox_backup::server::{ApiConfig, rest::*}; use proxmox_backup::auth_helpers::*; - -fn main() { - if let Err(err) = proxmox_backup::tools::runtime::main(run()) { - eprintln!("Error: {}", err); - std::process::exit(-1); +use proxmox_backup::tools::{ + daemon, + disks::{ + DiskManage, + zfs_pool_stats, + get_pool_from_dataset, + }, + logrotate::LogRotate, + socket::{ + set_tcp_keepalive, + PROXMOX_BACKUP_TCP_KEEPALIVE_TIME, + }, +}; + +use proxmox_backup::api2::pull::do_sync_job; +use proxmox_backup::api2::tape::backup::do_tape_backup_job; +use proxmox_backup::server::do_verification_job; +use proxmox_backup::server::do_prune_job; + +fn main() -> Result<(), Error> { + proxmox_backup::tools::setup_safe_path_env(); + + let backup_uid = proxmox_backup::backup::backup_user()?.uid; + let backup_gid = proxmox_backup::backup::backup_group()?.gid; + let running_uid = nix::unistd::Uid::effective(); + let running_gid = nix::unistd::Gid::effective(); + + if running_uid != backup_uid || running_gid != backup_gid { + bail!("proxy not running as backup user or group (got uid {} gid {})", running_uid, running_gid); } + + proxmox_backup::tools::runtime::main(run()) } async fn run() -> Result<(), Error> { @@ -30,61 +80,69 @@ async fn run() -> Result<(), Error> { bail!("unable to inititialize syslog - {}", err); } + // Note: To debug early connection error use + // PROXMOX_DEBUG=1 ./target/release/proxmox-backup-proxy + let debug = std::env::var("PROXMOX_DEBUG").is_ok(); + let _ = public_auth_key(); // load with lazy_static let _ = csrf_secret(); // load with lazy_static let mut config = ApiConfig::new( - buildcfg::JS_DIR, &proxmox_backup::api2::ROUTER, RpcEnvironmentType::PUBLIC)?; + buildcfg::JS_DIR, + &proxmox_backup::api2::ROUTER, + RpcEnvironmentType::PUBLIC, + default_api_auth(), + )?; - // add default dirs which includes jquery and bootstrap - // my $base = '/usr/share/libpve-http-server-perl'; - // add_dirs($self->{dirs}, '/css/' => "$base/css/"); - // add_dirs($self->{dirs}, '/js/' => "$base/js/"); - // add_dirs($self->{dirs}, '/fonts/' => "$base/fonts/"); config.add_alias("novnc", "/usr/share/novnc-pve"); config.add_alias("extjs", "/usr/share/javascript/extjs"); + config.add_alias("qrcodejs", "/usr/share/javascript/qrcodejs"); config.add_alias("fontawesome", "/usr/share/fonts-font-awesome"); config.add_alias("xtermjs", "/usr/share/pve-xtermjs"); + config.add_alias("locale", "/usr/share/pbs-i18n"); config.add_alias("widgettoolkit", "/usr/share/javascript/proxmox-widget-toolkit"); - config.add_alias("css", "/usr/share/javascript/proxmox-backup/css"); config.add_alias("docs", "/usr/share/doc/proxmox-backup/html"); + let mut indexpath = PathBuf::from(buildcfg::JS_DIR); + indexpath.push("index.hbs"); + config.register_template("index", &indexpath)?; + config.register_template("console", "/usr/share/pve-xtermjs/index.html.hbs")?; + + let mut commando_sock = server::CommandoSocket::new(server::our_ctrl_sock()); + + config.enable_file_log(buildcfg::API_ACCESS_LOG_FN, &mut commando_sock)?; + let rest_server = RestServer::new(config); //openssl req -x509 -newkey rsa:4096 -keyout /etc/proxmox-backup/proxy.key -out /etc/proxmox-backup/proxy.pem -nodes - let key_path = configdir!("/proxy.key"); - let cert_path = configdir!("/proxy.pem"); - - let mut acceptor = SslAcceptor::mozilla_intermediate(SslMethod::tls()).unwrap(); - acceptor.set_private_key_file(key_path, SslFiletype::PEM) - .map_err(|err| format_err!("unable to read proxy key {} - {}", key_path, err))?; - acceptor.set_certificate_chain_file(cert_path) - .map_err(|err| format_err!("unable to read proxy cert {} - {}", cert_path, err))?; - acceptor.check_private_key().unwrap(); - let acceptor = Arc::new(acceptor.build()); + // we build the initial acceptor here as we cannot start if this fails - certificate reloads + // will be handled inside the accept loop and simply log an error if we cannot load the new + // certificate! + let acceptor = make_tls_acceptor()?; + + // to renew the acceptor we just let a command-socket handler trigger a Notify: + let notify_tls_cert_reload = Arc::new(tokio::sync::Notify::new()); + commando_sock.register_command( + "reload-certificate".to_string(), + { + let notify_tls_cert_reload = Arc::clone(¬ify_tls_cert_reload); + move |_value| -> Result<_, Error> { + notify_tls_cert_reload.notify_one(); + Ok(Value::Null) + } + }, + )?; let server = daemon::create_daemon( ([0,0,0,0,0,0,0,0], 8007).into(), - |listener, ready| { - let connections = proxmox_backup::tools::async_io::StaticIncoming::from(listener) - .map_err(Error::from) - .try_filter_map(move |(sock, _addr)| { - let acceptor = Arc::clone(&acceptor); - async move { - sock.set_nodelay(true).unwrap(); - sock.set_send_buffer_size(1024*1024).unwrap(); - sock.set_recv_buffer_size(1024*1024).unwrap(); - Ok(tokio_openssl::accept(&acceptor, sock) - .await - .ok() // handshake errors aren't be fatal, so return None to filter - ) - } - }); - let connections = proxmox_backup::tools::async_io::HyperAccept(connections); + move |listener, ready| { + + let connections = accept_connections(listener, acceptor, debug, notify_tls_cert_reload); + let connections = hyper::server::accept::from_stream(ReceiverStream::new(connections)); Ok(ready - .and_then(|_| hyper::Server::builder(connections) + .and_then(|_| hyper::Server::builder(connections) .serve(rest_server) .with_graceful_shutdown(server::shutdown_future()) .map_err(Error::from) @@ -93,12 +151,15 @@ async fn run() -> Result<(), Error> { .map(|_| ()) ) }, + "proxmox-backup-proxy.service", ); + server::write_pid(buildcfg::PROXMOX_BACKUP_PROXY_PID_FN)?; daemon::systemd_notify(daemon::SystemdNotify::Ready)?; let init_result: Result<(), Error> = try_block!({ - server::create_task_control_socket()?; + server::register_task_control_commands(&mut commando_sock)?; + commando_sock.spawn()?; server::server_state_init()?; Ok(()) }); @@ -118,6 +179,142 @@ async fn run() -> Result<(), Error> { Ok(()) } +fn make_tls_acceptor() -> Result, Error> { + let key_path = configdir!("/proxy.key"); + let cert_path = configdir!("/proxy.pem"); + + let mut acceptor = SslAcceptor::mozilla_intermediate_v5(SslMethod::tls()).unwrap(); + acceptor.set_private_key_file(key_path, SslFiletype::PEM) + .map_err(|err| format_err!("unable to read proxy key {} - {}", key_path, err))?; + acceptor.set_certificate_chain_file(cert_path) + .map_err(|err| format_err!("unable to read proxy cert {} - {}", cert_path, err))?; + acceptor.check_private_key().unwrap(); + + Ok(Arc::new(acceptor.build())) +} + +type ClientStreamResult = + Result>>, Error>; +const MAX_PENDING_ACCEPTS: usize = 1024; + +fn accept_connections( + listener: tokio::net::TcpListener, + acceptor: Arc, + debug: bool, + notify_tls_cert_reload: Arc, +) -> tokio::sync::mpsc::Receiver { + + let (sender, receiver) = tokio::sync::mpsc::channel(MAX_PENDING_ACCEPTS); + + tokio::spawn(accept_connection(listener, acceptor, debug, sender, notify_tls_cert_reload)); + + receiver +} + +async fn accept_connection( + listener: tokio::net::TcpListener, + mut acceptor: Arc, + debug: bool, + sender: tokio::sync::mpsc::Sender, + notify_tls_cert_reload: Arc, +) { + let accept_counter = Arc::new(()); + + // Note that these must not be moved out/modified directly, they get pinned in the loop and + // "rearmed" after waking up: + let mut reload_tls = notify_tls_cert_reload.notified(); + let mut accept = listener.accept(); + + loop { + let sock; + + // normally we'd use `tokio::pin!()` but we need this to happen outside the loop and we + // need to be able to "rearm" the futures: + let reload_tls_pin = unsafe { Pin::new_unchecked(&mut reload_tls) }; + let accept_pin = unsafe { Pin::new_unchecked(&mut accept) }; + tokio::select! { + _ = reload_tls_pin => { + // rearm the notification: + reload_tls = notify_tls_cert_reload.notified(); + + log::info!("reloading certificate"); + match make_tls_acceptor() { + Err(err) => eprintln!("error reloading certificate: {}", err), + Ok(new_acceptor) => acceptor = new_acceptor, + } + continue; + } + res = accept_pin => match res { + Err(err) => { + eprintln!("error accepting tcp connection: {}", err); + continue; + } + Ok((new_sock, _addr)) => { + // rearm the accept future: + accept = listener.accept(); + + sock = new_sock; + } + } + }; + + sock.set_nodelay(true).unwrap(); + let _ = set_tcp_keepalive(sock.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME); + let acceptor = Arc::clone(&acceptor); + + let ssl = match openssl::ssl::Ssl::new(acceptor.context()) { + Ok(ssl) => ssl, + Err(err) => { + eprintln!("failed to create Ssl object from Acceptor context - {}", err); + continue; + }, + }; + let stream = match tokio_openssl::SslStream::new(ssl, sock) { + Ok(stream) => stream, + Err(err) => { + eprintln!("failed to create SslStream using ssl and connection socket - {}", err); + continue; + }, + }; + + let mut stream = Box::pin(stream); + let sender = sender.clone(); + + if Arc::strong_count(&accept_counter) > MAX_PENDING_ACCEPTS { + eprintln!("connection rejected - to many open connections"); + continue; + } + + let accept_counter = Arc::clone(&accept_counter); + tokio::spawn(async move { + let accept_future = tokio::time::timeout( + Duration::new(10, 0), stream.as_mut().accept()); + + let result = accept_future.await; + + match result { + Ok(Ok(())) => { + if sender.send(Ok(stream)).await.is_err() && debug { + eprintln!("detect closed connection channel"); + } + } + Ok(Err(err)) => { + if debug { + eprintln!("https handshake failed - {}", err); + } + } + Err(_) => { + if debug { + eprintln!("https handshake timeout"); + } + } + } + + drop(accept_counter); // decrease reference count + }); + } +} + fn start_stat_generator() { let abort_future = server::shutdown_future(); let future = Box::pin(run_stat_generator()); @@ -132,11 +329,12 @@ fn start_task_scheduler() { tokio::spawn(task.map(|_| ())); } -use std::time:: {Instant, Duration, SystemTime, UNIX_EPOCH}; +use std::time::{SystemTime, Instant, Duration, UNIX_EPOCH}; fn next_minute() -> Result { - let epoch_now = SystemTime::now().duration_since(UNIX_EPOCH)?; - let epoch_next = Duration::from_secs((epoch_now.as_secs()/60 + 1)*60); + let now = SystemTime::now(); + let epoch_now = now.duration_since(UNIX_EPOCH)?; + let epoch_next = Duration::from_secs((epoch_now.as_secs()/60 + 1)*60); Ok(Instant::now() + epoch_next - epoch_now) } @@ -151,7 +349,7 @@ async fn run_task_scheduler() { Ok(d) => d, Err(err) => { eprintln!("task scheduler: compute next minute failed - {}", err); - tokio::time::delay_until(tokio::time::Instant::from_std(Instant::now() + Duration::from_secs(60))).await; + tokio::time::sleep_until(tokio::time::Instant::from_std(Instant::now() + Duration::from_secs(60))).await; continue; } }; @@ -175,7 +373,7 @@ async fn run_task_scheduler() { } } - tokio::time::delay_until(tokio::time::Instant::from_std(delay_target)).await; + tokio::time::sleep_until(tokio::time::Instant::from_std(delay_target)).await; } } @@ -184,46 +382,21 @@ async fn schedule_tasks() -> Result<(), Error> { schedule_datastore_garbage_collection().await; schedule_datastore_prune().await; schedule_datastore_sync_jobs().await; + schedule_datastore_verify_jobs().await; + schedule_tape_backup_jobs().await; + schedule_task_log_rotate().await; Ok(()) } -fn lookup_last_worker(worker_type: &str, worker_id: &str) -> Result, Error> { - - let list = proxmox_backup::server::read_task_list()?; - - let mut last: Option<&server::UPID> = None; - - for entry in list.iter() { - if entry.upid.worker_type == worker_type { - if let Some(ref id) = entry.upid.worker_id { - if id == worker_id { - match last { - Some(ref upid) => { - if upid.starttime < entry.upid.starttime { - last = Some(&entry.upid) - } - } - None => { - last = Some(&entry.upid) - } - } - } - } - } - } - - Ok(last.cloned()) -} - - async fn schedule_datastore_garbage_collection() { - use proxmox_backup::backup::DataStore; - use proxmox_backup::server::{UPID, WorkerTask}; - use proxmox_backup::config::datastore::{self, DataStoreConfig}; - use proxmox_backup::tools::systemd::time::{ - parse_calendar_event, compute_next_event}; + use proxmox_backup::config::{ + datastore::{ + self, + DataStoreConfig, + }, + }; let config = match datastore::config() { Err(err) => { @@ -267,68 +440,51 @@ async fn schedule_datastore_garbage_collection() { let worker_type = "garbage_collection"; - let stat = datastore.last_gc_status(); - let last = if let Some(upid_str) = stat.upid { - match upid_str.parse::() { - Ok(upid) => upid.starttime, - Err(err) => { - eprintln!("unable to parse upid '{}' - {}", upid_str, err); - continue; - } - } - } else { - match lookup_last_worker(worker_type, &store) { - Ok(Some(upid)) => upid.starttime, - Ok(None) => 0, - Err(err) => { - eprintln!("lookup_last_job_start failed: {}", err); - continue; - } + let last = match jobstate::last_run_time(worker_type, &store) { + Ok(time) => time, + Err(err) => { + eprintln!("could not get last run time of {} {}: {}", worker_type, store, err); + continue; } }; let next = match compute_next_event(&event, last, false) { - Ok(next) => next, + Ok(Some(next)) => next, + Ok(None) => continue, Err(err) => { eprintln!("compute_next_event for '{}' failed - {}", event_str, err); continue; } }; - let now = match SystemTime::now().duration_since(UNIX_EPOCH) { - Ok(epoch_now) => epoch_now.as_secs() as i64, - Err(err) => { - eprintln!("query system time failed - {}", err); - continue; - } - }; + + let now = proxmox::tools::time::epoch_i64(); + if next > now { continue; } - let store2 = store.clone(); + let job = match Job::new(worker_type, &store) { + Ok(job) => job, + Err(_) => continue, // could not get lock + }; - if let Err(err) = WorkerTask::new_thread( - worker_type, - Some(store.clone()), - "backup@pam", - false, - move |worker| { - worker.log(format!("starting garbage collection on store {}", store)); - worker.log(format!("task triggered by schedule '{}'", event_str)); - datastore.garbage_collection(&worker) - } - ) { - eprintln!("unable to start garbage collection on store {} - {}", store2, err); + let auth_id = Authid::root_auth_id(); + + if let Err(err) = crate::server::do_garbage_collection_job(job, datastore, auth_id, Some(event_str), false) { + eprintln!("unable to start garbage collection job on datastore {} - {}", store, err); } } } async fn schedule_datastore_prune() { - use proxmox_backup::backup::{ - PruneOptions, DataStore, BackupGroup, BackupDir, compute_prune_info}; - use proxmox_backup::server::{WorkerTask}; - use proxmox_backup::config::datastore::{self, DataStoreConfig}; - use proxmox_backup::tools::systemd::time::{ - parse_calendar_event, compute_next_event}; + use proxmox_backup::{ + backup::{ + PruneOptions, + }, + config::datastore::{ + self, + DataStoreConfig, + }, + }; let config = match datastore::config() { Err(err) => { @@ -339,13 +495,6 @@ async fn schedule_datastore_prune() { }; for (store, (_, store_config)) in config.sections { - let datastore = match DataStore::lookup_datastore(&store) { - Ok(datastore) => datastore, - Err(err) => { - eprintln!("lookup_datastore '{}' failed - {}", store, err); - continue; - } - }; let store_config: DataStoreConfig = match serde_json::from_value(store_config) { Ok(c) => c, @@ -373,96 +522,26 @@ async fn schedule_datastore_prune() { continue; } - let event = match parse_calendar_event(&event_str) { - Ok(event) => event, - Err(err) => { - eprintln!("unable to parse schedule '{}' - {}", event_str, err); - continue; - } - }; - - //fixme: if last_prune_job_stzill_running { continue; } - let worker_type = "prune"; + if check_schedule(worker_type, &event_str, &store) { + let job = match Job::new(worker_type, &store) { + Ok(job) => job, + Err(_) => continue, // could not get lock + }; - let last = match lookup_last_worker(worker_type, &store) { - Ok(Some(upid)) => upid.starttime, - Ok(None) => 0, - Err(err) => { - eprintln!("lookup_last_job_start failed: {}", err); - continue; + let auth_id = Authid::root_auth_id().clone(); + if let Err(err) = do_prune_job(job, prune_options, store.clone(), &auth_id, Some(event_str)) { + eprintln!("unable to start datastore prune job {} - {}", &store, err); } }; - - let next = match compute_next_event(&event, last, false) { - Ok(next) => next, - Err(err) => { - eprintln!("compute_next_event for '{}' failed - {}", event_str, err); - continue; - } - }; - - let now = match SystemTime::now().duration_since(UNIX_EPOCH) { - Ok(epoch_now) => epoch_now.as_secs() as i64, - Err(err) => { - eprintln!("query system time failed - {}", err); - continue; - } - }; - if next > now { continue; } - - let store2 = store.clone(); - - if let Err(err) = WorkerTask::new_thread( - worker_type, - Some(store.clone()), - "backup@pam", - false, - move |worker| { - worker.log(format!("Starting datastore prune on store \"{}\"", store)); - worker.log(format!("task triggered by schedule '{}'", event_str)); - worker.log(format!("retention options: {}", prune_options.cli_options_string())); - - let base_path = datastore.base_path(); - - let groups = BackupGroup::list_groups(&base_path)?; - for group in groups { - let list = group.list_backups(&base_path)?; - let mut prune_info = compute_prune_info(list, &prune_options)?; - prune_info.reverse(); // delete older snapshots first - - worker.log(format!("Starting prune on store \"{}\" group \"{}/{}\"", - store, group.backup_type(), group.backup_id())); - - for (info, keep) in prune_info { - worker.log(format!( - "{} {}/{}/{}", - if keep { "keep" } else { "remove" }, - group.backup_type(), group.backup_id(), - BackupDir::backup_time_to_string(info.backup_dir.backup_time()))); - - if !keep { - datastore.remove_backup_dir(&info.backup_dir)?; - } - } - } - - Ok(()) - } - ) { - eprintln!("unable to start datastore prune on store {} - {}", store2, err); - } } } async fn schedule_datastore_sync_jobs() { - use proxmox_backup::{ - backup::DataStore, - client::{ HttpClient, HttpClientOptions, BackupRepository, pull::pull_store }, - server::{ WorkerTask }, - config::{ sync::{self, SyncJobConfig}, remote::{self, Remote} }, - tools::systemd::time::{ parse_calendar_event, compute_next_event }, + use proxmox_backup::config::sync::{ + self, + SyncJobConfig, }; let config = match sync::config() { @@ -473,14 +552,6 @@ async fn schedule_datastore_sync_jobs() { Ok((config, _digest)) => config, }; - let remote_config = match remote::config() { - Err(err) => { - eprintln!("unable to read remote config - {}", err); - return; - } - Ok((config, _digest)) => config, - }; - for (job_id, (_, job_config)) in config.sections { let job_config: SyncJobConfig = match serde_json::from_value(job_config) { Ok(c) => c, @@ -495,138 +566,247 @@ async fn schedule_datastore_sync_jobs() { None => continue, }; - let event = match parse_calendar_event(&event_str) { - Ok(event) => event, - Err(err) => { - eprintln!("unable to parse schedule '{}' - {}", event_str, err); - continue; + let worker_type = "syncjob"; + if check_schedule(worker_type, &event_str, &job_id) { + let job = match Job::new(worker_type, &job_id) { + Ok(job) => job, + Err(_) => continue, // could not get lock + }; + + let auth_id = Authid::root_auth_id().clone(); + if let Err(err) = do_sync_job(job, job_config, &auth_id, Some(event_str)) { + eprintln!("unable to start datastore sync job {} - {}", &job_id, err); } }; + } +} - //fixme: if last_sync_job_still_running { continue; } +async fn schedule_datastore_verify_jobs() { - let worker_type = "sync"; + use proxmox_backup::config::verify::{ + self, + VerificationJobConfig, + }; - let last = match lookup_last_worker(worker_type, &job_config.store) { - Ok(Some(upid)) => upid.starttime, - Ok(None) => 0, + let config = match verify::config() { + Err(err) => { + eprintln!("unable to read verification job config - {}", err); + return; + } + Ok((config, _digest)) => config, + }; + for (job_id, (_, job_config)) in config.sections { + let job_config: VerificationJobConfig = match serde_json::from_value(job_config) { + Ok(c) => c, Err(err) => { - eprintln!("lookup_last_job_start failed: {}", err); + eprintln!("verification job config from_value failed - {}", err); continue; } }; - - let next = match compute_next_event(&event, last, false) { - Ok(next) => next, - Err(err) => { - eprintln!("compute_next_event for '{}' failed - {}", event_str, err); - continue; - } + let event_str = match job_config.schedule { + Some(ref event_str) => event_str.clone(), + None => continue, }; - let now = match SystemTime::now().duration_since(UNIX_EPOCH) { - Ok(epoch_now) => epoch_now.as_secs() as i64, - Err(err) => { - eprintln!("query system time failed - {}", err); - continue; + let worker_type = "verificationjob"; + let auth_id = Authid::root_auth_id().clone(); + if check_schedule(worker_type, &event_str, &job_id) { + let job = match Job::new(&worker_type, &job_id) { + Ok(job) => job, + Err(_) => continue, // could not get lock + }; + if let Err(err) = do_verification_job(job, job_config, &auth_id, Some(event_str)) { + eprintln!("unable to start datastore verification job {} - {}", &job_id, err); } }; - if next > now { continue; } + } +} +async fn schedule_tape_backup_jobs() { - let job_id2 = job_id.clone(); + use proxmox_backup::config::tape_job::{ + self, + TapeBackupJobConfig, + }; - let tgt_store = match DataStore::lookup_datastore(&job_config.store) { - Ok(datastore) => datastore, + let config = match tape_job::config() { + Err(err) => { + eprintln!("unable to read tape job config - {}", err); + return; + } + Ok((config, _digest)) => config, + }; + for (job_id, (_, job_config)) in config.sections { + let job_config: TapeBackupJobConfig = match serde_json::from_value(job_config) { + Ok(c) => c, Err(err) => { - eprintln!("lookup_datastore '{}' failed - {}", job_config.store, err); + eprintln!("tape backup job config from_value failed - {}", err); continue; } }; + let event_str = match job_config.schedule { + Some(ref event_str) => event_str.clone(), + None => continue, + }; - let remote: Remote = match remote_config.lookup("remote", &job_config.remote) { - Ok(remote) => remote, - Err(err) => { - eprintln!("remote_config lookup failed: {}", err); - continue; + let worker_type = "tape-backup-job"; + let auth_id = Authid::root_auth_id().clone(); + if check_schedule(worker_type, &event_str, &job_id) { + let job = match Job::new(&worker_type, &job_id) { + Ok(job) => job, + Err(_) => continue, // could not get lock + }; + if let Err(err) = do_tape_backup_job(job, job_config.setup, &auth_id, Some(event_str)) { + eprintln!("unable to start tape backup job {} - {}", &job_id, err); } }; + } +} - let username = String::from("backup@pam"); - let delete = job_config.remove_vanished.unwrap_or(true); +async fn schedule_task_log_rotate() { - if let Err(err) = WorkerTask::spawn( - worker_type, - Some(job_config.store.clone()), - &username.clone(), - false, - move |worker| async move { - worker.log(format!("Starting datastore sync job '{}'", job_id)); - worker.log(format!("task triggered by schedule '{}'", event_str)); - worker.log(format!("Sync datastore '{}' from '{}/{}'", - job_config.store, job_config.remote, job_config.remote_store)); + let worker_type = "logrotate"; + let job_id = "access-log_and_task-archive"; - let options = HttpClientOptions::new() - .password(Some(remote.password.clone())) - .fingerprint(remote.fingerprint.clone()); + // schedule daily at 00:00 like normal logrotate + let schedule = "00:00"; - let client = HttpClient::new(&remote.host, &remote.userid, options)?; - let _auth_info = client.login() // make sure we can auth - .await - .map_err(|err| format_err!("remote connection to '{}' failed - {}", remote.host, err))?; + if !check_schedule(worker_type, schedule, job_id) { + // if we never ran the rotation, schedule instantly + match jobstate::JobState::load(worker_type, job_id) { + Ok(state) => match state { + jobstate::JobState::Created { .. } => {}, + _ => return, + }, + _ => return, + } + } - let src_repo = BackupRepository::new(Some(remote.userid), Some(remote.host), job_config.remote_store); + let mut job = match Job::new(worker_type, job_id) { + Ok(job) => job, + Err(_) => return, // could not get lock + }; + + if let Err(err) = WorkerTask::new_thread( + worker_type, + None, + Authid::root_auth_id().clone(), + false, + move |worker| { + job.start(&worker.upid().to_string())?; + worker.log("starting task log rotation".to_string()); + + let result = try_block!({ + let max_size = 512 * 1024 - 1; // an entry has ~ 100b, so > 5000 entries/file + let max_files = 20; // times twenty files gives > 100000 task entries + let has_rotated = rotate_task_log_archive(max_size, true, Some(max_files))?; + if has_rotated { + worker.log("task log archive was rotated".to_string()); + } else { + worker.log("task log archive was not rotated".to_string()); + } + + let max_size = 32 * 1024 * 1024 - 1; + let max_files = 14; + let mut logrotate = LogRotate::new(buildcfg::API_ACCESS_LOG_FN, true) + .ok_or_else(|| format_err!("could not get API access log file names"))?; + + if logrotate.rotate(max_size, None, Some(max_files))? { + println!("rotated access log, telling daemons to re-open log file"); + proxmox_backup::tools::runtime::block_on(command_reopen_logfiles())?; + worker.log("API access log was rotated".to_string()); + } else { + worker.log("API access log was not rotated".to_string()); + } + + let mut logrotate = LogRotate::new(buildcfg::API_AUTH_LOG_FN, true) + .ok_or_else(|| format_err!("could not get API auth log file names"))?; - pull_store(&worker, &client, &src_repo, tgt_store, delete, username).await?; + if logrotate.rotate(max_size, None, Some(max_files))? { + worker.log("API authentication log was rotated".to_string()); + } else { + worker.log("API authentication log was not rotated".to_string()); + } Ok(()) + }); + + let status = worker.create_state(&result); + + if let Err(err) = job.finish(status) { + eprintln!("could not finish job state for {}: {}", worker_type, err); } - ) { - eprintln!("unable to start datastore sync job {} - {}", job_id2, err); - } + + result + }, + ) { + eprintln!("unable to start task log rotation: {}", err); + } + +} + +async fn command_reopen_logfiles() -> Result<(), Error> { + // only care about the most recent daemon instance for each, proxy & api, as other older ones + // should not respond to new requests anyway, but only finish their current one and then exit. + let sock = server::our_ctrl_sock(); + let f1 = server::send_command(sock, "{\"command\":\"api-access-log-reopen\"}\n"); + + let pid = server::read_pid(buildcfg::PROXMOX_BACKUP_API_PID_FN)?; + let sock = server::ctrl_sock_from_pid(pid); + let f2 = server::send_command(sock, "{\"command\":\"api-access-log-reopen\"}\n"); + + match futures::join!(f1, f2) { + (Err(e1), Err(e2)) => Err(format_err!("reopen commands failed, proxy: {}; api: {}", e1, e2)), + (Err(e1), Ok(_)) => Err(format_err!("reopen commands failed, proxy: {}", e1)), + (Ok(_), Err(e2)) => Err(format_err!("reopen commands failed, api: {}", e2)), + _ => Ok(()), } } async fn run_stat_generator() { + let mut count = 0; loop { + count += 1; + let save = if count >= 6 { count = 0; true } else { false }; + let delay_target = Instant::now() + Duration::from_secs(10); - generate_host_stats().await; + generate_host_stats(save).await; - tokio::time::delay_until(tokio::time::Instant::from_std(delay_target)).await; - } + tokio::time::sleep_until(tokio::time::Instant::from_std(delay_target)).await; + + } } -fn rrd_update_gauge(name: &str, value: f64) { +fn rrd_update_gauge(name: &str, value: f64, save: bool) { use proxmox_backup::rrd; - if let Err(err) = rrd::update_value(name, value, rrd::DST::Gauge) { + if let Err(err) = rrd::update_value(name, value, rrd::DST::Gauge, save) { eprintln!("rrd::update_value '{}' failed - {}", name, err); } } -fn rrd_update_derive(name: &str, value: f64) { +fn rrd_update_derive(name: &str, value: f64, save: bool) { use proxmox_backup::rrd; - if let Err(err) = rrd::update_value(name, value, rrd::DST::Derive) { + if let Err(err) = rrd::update_value(name, value, rrd::DST::Derive, save) { eprintln!("rrd::update_value '{}' failed - {}", name, err); } } -async fn generate_host_stats() { +async fn generate_host_stats(save: bool) { use proxmox::sys::linux::procfs::{ read_meminfo, read_proc_stat, read_proc_net_dev, read_loadavg}; use proxmox_backup::config::datastore; - use proxmox_backup::tools::disks::DiskManage; proxmox_backup::tools::runtime::block_in_place(move || { match read_proc_stat() { Ok(stat) => { - rrd_update_gauge("host/cpu", stat.cpu); - rrd_update_gauge("host/iowait", stat.iowait_percent); + rrd_update_gauge("host/cpu", stat.cpu, save); + rrd_update_gauge("host/iowait", stat.iowait_percent, save); } Err(err) => { eprintln!("read_proc_stat failed - {}", err); @@ -635,10 +815,10 @@ async fn generate_host_stats() { match read_meminfo() { Ok(meminfo) => { - rrd_update_gauge("host/memtotal", meminfo.memtotal as f64); - rrd_update_gauge("host/memused", meminfo.memused as f64); - rrd_update_gauge("host/swaptotal", meminfo.swaptotal as f64); - rrd_update_gauge("host/swapused", meminfo.swapused as f64); + rrd_update_gauge("host/memtotal", meminfo.memtotal as f64, save); + rrd_update_gauge("host/memused", meminfo.memused as f64, save); + rrd_update_gauge("host/swaptotal", meminfo.swaptotal as f64, save); + rrd_update_gauge("host/swapused", meminfo.swapused as f64, save); } Err(err) => { eprintln!("read_meminfo failed - {}", err); @@ -655,8 +835,8 @@ async fn generate_host_stats() { netin += item.receive; netout += item.send; } - rrd_update_derive("host/netin", netin as f64); - rrd_update_derive("host/netout", netout as f64); + rrd_update_derive("host/netin", netin as f64, save); + rrd_update_derive("host/netout", netout as f64, save); } Err(err) => { eprintln!("read_prox_net_dev failed - {}", err); @@ -665,71 +845,27 @@ async fn generate_host_stats() { match read_loadavg() { Ok(loadavg) => { - rrd_update_gauge("host/loadavg", loadavg.0 as f64); + rrd_update_gauge("host/loadavg", loadavg.0 as f64, save); } Err(err) => { eprintln!("read_loadavg failed - {}", err); } } - match disk_usage(std::path::Path::new("/")) { - Ok((total, used, _avail)) => { - rrd_update_gauge("host/roottotal", total as f64); - rrd_update_gauge("host/rootused", used as f64); - } - Err(err) => { - eprintln!("read root disk_usage failed - {}", err); - } - } - let disk_manager = DiskManage::new(); + gather_disk_stats(disk_manager.clone(), Path::new("/"), "host", save); + match datastore::config() { Ok((config, _)) => { let datastore_list: Vec = - config.convert_to_typed_array("datastore").unwrap_or(Vec::new()); + config.convert_to_typed_array("datastore").unwrap_or_default(); for config in datastore_list { - match disk_usage(std::path::Path::new(&config.path)) { - Ok((total, used, _avail)) => { - let rrd_key = format!("datastore/{}/total", config.name); - rrd_update_gauge(&rrd_key, total as f64); - let rrd_key = format!("datastore/{}/used", config.name); - rrd_update_gauge(&rrd_key, used as f64); - } - Err(err) => { - eprintln!("read disk_usage on {:?} failed - {}", config.path, err); - } - } + let rrd_prefix = format!("datastore/{}", config.name); let path = std::path::Path::new(&config.path); - - match disk_manager.mount_info() { - Ok(mountinfo) => { - if let Some(device) = find_mounted_device(mountinfo, path) { - if let Ok(disk) = disk_manager.clone().disk_by_dev_num(device.into_dev_t()) { - if let Ok(Some(stat)) = disk.read_stat() { - let rrd_key = format!("datastore/{}/read_ios", config.name); - rrd_update_derive(&rrd_key, stat.read_ios as f64); - let rrd_key = format!("datastore/{}/read_bytes", config.name); - rrd_update_derive(&rrd_key, (stat.read_sectors*512) as f64); - let rrd_key = format!("datastore/{}/read_ticks", config.name); - rrd_update_derive(&rrd_key, (stat.read_ticks as f64)/1000.0); - - let rrd_key = format!("datastore/{}/write_ios", config.name); - rrd_update_derive(&rrd_key, stat.write_ios as f64); - let rrd_key = format!("datastore/{}/write_bytes", config.name); - rrd_update_derive(&rrd_key, (stat.write_sectors*512) as f64); - let rrd_key = format!("datastore/{}/write_ticks", config.name); - rrd_update_derive(&rrd_key, (stat.write_ticks as f64)/1000.0); - } - } - } - } - Err(err) => { - eprintln!("disk_manager mount_info() failed - {}", err); - } - } + gather_disk_stats(disk_manager.clone(), path, &rrd_prefix, save); } } Err(err) => { @@ -740,39 +876,90 @@ async fn generate_host_stats() { }); } -// Returns (total, used, avail) -fn disk_usage(path: &std::path::Path) -> Result<(u64, u64, u64), Error> { +fn check_schedule(worker_type: &str, event_str: &str, id: &str) -> bool { + let event = match parse_calendar_event(event_str) { + Ok(event) => event, + Err(err) => { + eprintln!("unable to parse schedule '{}' - {}", event_str, err); + return false; + } + }; - let mut stat: libc::statfs64 = unsafe { std::mem::zeroed() }; + let last = match jobstate::last_run_time(worker_type, &id) { + Ok(time) => time, + Err(err) => { + eprintln!("could not get last run time of {} {}: {}", worker_type, id, err); + return false; + } + }; - use nix::NixPath; + let next = match compute_next_event(&event, last, false) { + Ok(Some(next)) => next, + Ok(None) => return false, + Err(err) => { + eprintln!("compute_next_event for '{}' failed - {}", event_str, err); + return false; + } + }; - let res = path.with_nix_path(|cstr| unsafe { libc::statfs64(cstr.as_ptr(), &mut stat) })?; - nix::errno::Errno::result(res)?; + let now = proxmox::tools::time::epoch_i64(); + next <= now +} - let bsize = stat.f_bsize as u64; +fn gather_disk_stats(disk_manager: Arc, path: &Path, rrd_prefix: &str, save: bool) { - Ok((stat.f_blocks*bsize, (stat.f_blocks-stat.f_bfree)*bsize, stat.f_bavail*bsize)) -} + match proxmox_backup::tools::disks::disk_usage(path) { + Ok(status) => { + let rrd_key = format!("{}/total", rrd_prefix); + rrd_update_gauge(&rrd_key, status.total as f64, save); + let rrd_key = format!("{}/used", rrd_prefix); + rrd_update_gauge(&rrd_key, status.used as f64, save); + } + Err(err) => { + eprintln!("read disk_usage on {:?} failed - {}", path, err); + } + } -pub fn find_mounted_device( - mountinfo: &proxmox::sys::linux::procfs::mountinfo::MountInfo, - path: &std::path::Path, -) -> Option { + match disk_manager.find_mounted_device(path) { + Ok(None) => {}, + Ok(Some((fs_type, device, source))) => { + let mut device_stat = None; + match fs_type.as_str() { + "zfs" => { + if let Some(source) = source { + let pool = get_pool_from_dataset(&source).unwrap_or(&source); + match zfs_pool_stats(pool) { + Ok(stat) => device_stat = stat, + Err(err) => eprintln!("zfs_pool_stats({:?}) failed - {}", pool, err), + } + } + } + _ => { + if let Ok(disk) = disk_manager.clone().disk_by_dev_num(device.into_dev_t()) { + match disk.read_stat() { + Ok(stat) => device_stat = stat, + Err(err) => eprintln!("disk.read_stat {:?} failed - {}", path, err), + } + } + } + } + if let Some(stat) = device_stat { + let rrd_key = format!("{}/read_ios", rrd_prefix); + rrd_update_derive(&rrd_key, stat.read_ios as f64, save); + let rrd_key = format!("{}/read_bytes", rrd_prefix); + rrd_update_derive(&rrd_key, (stat.read_sectors*512) as f64, save); - let mut result = None; - let mut match_len = 0; + let rrd_key = format!("{}/write_ios", rrd_prefix); + rrd_update_derive(&rrd_key, stat.write_ios as f64, save); + let rrd_key = format!("{}/write_bytes", rrd_prefix); + rrd_update_derive(&rrd_key, (stat.write_sectors*512) as f64, save); - let root_path = std::path::Path::new("/"); - for (_id, entry) in mountinfo { - if entry.root == root_path && path.starts_with(&entry.mount_point) { - let len = entry.mount_point.as_path().as_os_str().len(); - if len > match_len { - match_len = len; - result = Some(entry.device); + let rrd_key = format!("{}/io_ticks", rrd_prefix); + rrd_update_derive(&rrd_key, (stat.io_ticks as f64)/1000.0, save); } } + Err(err) => { + eprintln!("find_mounted_device failed - {}", err); + } } - - result }