use std::sync::{Arc};
use std::path::{Path, PathBuf};
+use std::os::unix::io::AsRawFd;
use anyhow::{bail, format_err, Error};
use futures::*;
use proxmox_backup::configdir;
use proxmox_backup::buildcfg;
use proxmox_backup::server;
-use proxmox_backup::tools::daemon;
use proxmox_backup::server::{ApiConfig, rest::*};
use proxmox_backup::auth_helpers::*;
-use proxmox_backup::tools::disks::{ DiskManage, zfs_pool_stats };
+use proxmox_backup::tools::{
+ daemon,
+ disks::{
+ DiskManage,
+ zfs_pool_stats,
+ },
+ socket::{
+ set_tcp_keepalive,
+ PROXMOX_BACKUP_TCP_KEEPALIVE_TIME,
+ },
+};
use proxmox_backup::api2::pull::do_sync_job;
+use proxmox_backup::backup::do_verification_job;
fn main() -> Result<(), Error> {
proxmox_backup::tools::setup_safe_path_env();
config.register_template("index", &indexpath)?;
config.register_template("console", "/usr/share/pve-xtermjs/index.html.hbs")?;
+ config.enable_file_log(buildcfg::API_ACCESS_LOG_FN)?;
+
let rest_server = RestServer::new(config);
//openssl req -x509 -newkey rsa:4096 -keyout /etc/proxmox-backup/proxy.key -out /etc/proxmox-backup/proxy.pem -nodes
let key_path = configdir!("/proxy.key");
let cert_path = configdir!("/proxy.pem");
- let mut acceptor = SslAcceptor::mozilla_intermediate(SslMethod::tls()).unwrap();
+ let mut acceptor = SslAcceptor::mozilla_intermediate_v5(SslMethod::tls()).unwrap();
acceptor.set_private_key_file(key_path, SslFiletype::PEM)
.map_err(|err| format_err!("unable to read proxy key {} - {}", key_path, err))?;
acceptor.set_certificate_chain_file(cert_path)
let acceptor = Arc::clone(&acceptor);
async move {
sock.set_nodelay(true).unwrap();
+
+ let _ = set_tcp_keepalive(sock.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME);
+
Ok(tokio_openssl::accept(&acceptor, sock)
.await
.ok() // handshake errors aren't be fatal, so return None to filter
schedule_datastore_garbage_collection().await;
schedule_datastore_prune().await;
- schedule_datastore_verification().await;
schedule_datastore_sync_jobs().await;
+ schedule_datastore_verify_jobs().await;
+ schedule_task_log_rotate().await;
Ok(())
}
-fn lookup_last_worker(worker_type: &str, worker_id: &str) -> Result<Option<server::UPID>, Error> {
-
- let list = proxmox_backup::server::read_task_list()?;
-
- let mut last: Option<&server::UPID> = None;
-
- for entry in list.iter() {
- if entry.upid.worker_type == worker_type {
- if let Some(ref id) = entry.upid.worker_id {
- if id == worker_id {
- match last {
- Some(ref upid) => {
- if upid.starttime < entry.upid.starttime {
- last = Some(&entry.upid)
- }
- }
- None => {
- last = Some(&entry.upid)
- }
- }
- }
- }
- }
- }
-
- Ok(last.cloned())
-}
-
-
async fn schedule_datastore_garbage_collection() {
use proxmox_backup::backup::DataStore;
use proxmox_backup::server::{UPID, WorkerTask};
- use proxmox_backup::config::datastore::{self, DataStoreConfig};
+ use proxmox_backup::config::{
+ jobstate::{self, Job},
+ datastore::{self, DataStoreConfig}
+ };
use proxmox_backup::tools::systemd::time::{
parse_calendar_event, compute_next_event};
}
}
} else {
- match lookup_last_worker(worker_type, &store) {
- Ok(Some(upid)) => upid.starttime,
- Ok(None) => 0,
+ match jobstate::last_run_time(worker_type, &store) {
+ Ok(time) => time,
Err(err) => {
- eprintln!("lookup_last_job_start failed: {}", err);
+ eprintln!("could not get last run time of {} {}: {}", worker_type, store, err);
continue;
}
}
if next > now { continue; }
+ let mut job = match Job::new(worker_type, &store) {
+ Ok(job) => job,
+ Err(_) => continue, // could not get lock
+ };
+
let store2 = store.clone();
if let Err(err) = WorkerTask::new_thread(
Userid::backup_userid().clone(),
false,
move |worker| {
+ job.start(&worker.upid().to_string())?;
+
worker.log(format!("starting garbage collection on store {}", store));
worker.log(format!("task triggered by schedule '{}'", event_str));
- datastore.garbage_collection(&worker)
+
+ let result = datastore.garbage_collection(&*worker, worker.upid());
+
+ let status = worker.create_state(&result);
+
+ if let Err(err) = job.finish(status) {
+ eprintln!("could not finish job state for {}: {}", worker_type, err);
+ }
+
+ result
}
) {
eprintln!("unable to start garbage collection on store {} - {}", store2, err);
job.start(&worker.upid().to_string())?;
- let result = {
+ let result = try_block!({
worker.log(format!("Starting datastore prune on store \"{}\"", store));
worker.log(format!("task triggered by schedule '{}'", event_str));
}
}
Ok(())
- };
+ });
let status = worker.create_state(&result);
}
}
-async fn schedule_datastore_verification() {
- use proxmox_backup::backup::{DataStore, verify_all_backups};
- use proxmox_backup::server::{WorkerTask};
- use proxmox_backup::config::datastore::{self, DataStoreConfig};
- use proxmox_backup::tools::systemd::time::{
- parse_calendar_event, compute_next_event};
+async fn schedule_datastore_sync_jobs() {
- let config = match datastore::config() {
+ use proxmox_backup::{
+ config::{ sync::{self, SyncJobConfig}, jobstate::{self, Job} },
+ tools::systemd::time::{ parse_calendar_event, compute_next_event },
+ };
+
+ let config = match sync::config() {
Err(err) => {
- eprintln!("unable to read datastore config - {}", err);
+ eprintln!("unable to read sync job config - {}", err);
return;
}
Ok((config, _digest)) => config,
};
- for (store, (_, store_config)) in config.sections {
- let datastore = match DataStore::lookup_datastore(&store) {
- Ok(datastore) => datastore,
- Err(err) => {
- eprintln!("lookup_datastore failed - {}", err);
- continue;
- }
- };
-
- let store_config: DataStoreConfig = match serde_json::from_value(store_config) {
+ for (job_id, (_, job_config)) in config.sections {
+ let job_config: SyncJobConfig = match serde_json::from_value(job_config) {
Ok(c) => c,
Err(err) => {
- eprintln!("datastore config from_value failed - {}", err);
+ eprintln!("sync job config from_value failed - {}", err);
continue;
}
};
- let event_str = match store_config.verify_schedule {
- Some(event_str) => event_str,
+ let event_str = match job_config.schedule {
+ Some(ref event_str) => event_str.clone(),
None => continue,
};
}
};
- let worker_type = "verify";
+ let worker_type = "syncjob";
- let last = match lookup_last_worker(worker_type, &store) {
- Ok(Some(upid)) => {
- if proxmox_backup::server::worker_is_active_local(&upid) {
- continue;
- }
- upid.starttime
- }
- Ok(None) => 0,
+ let last = match jobstate::last_run_time(worker_type, &job_id) {
+ Ok(time) => time,
Err(err) => {
- eprintln!("lookup_last_job_start failed: {}", err);
+ eprintln!("could not get last run time of {} {}: {}", worker_type, job_id, err);
continue;
}
};
let now = proxmox::tools::time::epoch_i64();
- if next > now { continue; }
+ if next > now { continue; }
- let worker_id = store.clone();
- let store2 = store.clone();
- if let Err(err) = WorkerTask::new_thread(
- worker_type,
- Some(worker_id),
- Userid::backup_userid().clone(),
- false,
- move |worker| {
- worker.log(format!("starting verification on store {}", store2));
- worker.log(format!("task triggered by schedule '{}'", event_str));
- if let Ok(failed_dirs) = verify_all_backups(datastore, worker.clone()) {
- if failed_dirs.len() > 0 {
- worker.log("Failed to verify following snapshots:");
- for dir in failed_dirs {
- worker.log(format!("\t{}", dir));
- }
- bail!("verification failed - please check the log for details");
- }
- }
- Ok(())
- },
- ) {
- eprintln!("unable to start verification on store {} - {}", store, err);
+ let job = match Job::new(worker_type, &job_id) {
+ Ok(job) => job,
+ Err(_) => continue, // could not get lock
+ };
+
+ let userid = Userid::backup_userid().clone();
+
+ if let Err(err) = do_sync_job(job, job_config, &userid, Some(event_str)) {
+ eprintln!("unable to start datastore sync job {} - {}", &job_id, err);
}
}
}
-async fn schedule_datastore_sync_jobs() {
-
+async fn schedule_datastore_verify_jobs() {
use proxmox_backup::{
- config::{ sync::{self, SyncJobConfig}, jobstate::{self, Job} },
- tools::systemd::time::{ parse_calendar_event, compute_next_event },
+ config::{verify::{self, VerificationJobConfig}, jobstate::{self, Job}},
+ tools::systemd::time::{parse_calendar_event, compute_next_event},
};
-
- let config = match sync::config() {
+ let config = match verify::config() {
Err(err) => {
- eprintln!("unable to read sync job config - {}", err);
+ eprintln!("unable to read verification job config - {}", err);
return;
}
Ok((config, _digest)) => config,
};
-
for (job_id, (_, job_config)) in config.sections {
- let job_config: SyncJobConfig = match serde_json::from_value(job_config) {
+ let job_config: VerificationJobConfig = match serde_json::from_value(job_config) {
Ok(c) => c,
Err(err) => {
- eprintln!("sync job config from_value failed - {}", err);
+ eprintln!("verification job config from_value failed - {}", err);
continue;
}
};
-
let event_str = match job_config.schedule {
Some(ref event_str) => event_str.clone(),
None => continue,
};
-
let event = match parse_calendar_event(&event_str) {
Ok(event) => event,
Err(err) => {
continue;
}
};
-
- let worker_type = "syncjob";
-
+ let worker_type = "verificationjob";
let last = match jobstate::last_run_time(worker_type, &job_id) {
Ok(time) => time,
Err(err) => {
continue;
}
};
-
let next = match compute_next_event(&event, last, false) {
Ok(Some(next)) => next,
Ok(None) => continue,
continue;
}
};
-
let now = proxmox::tools::time::epoch_i64();
-
- if next > now { continue; }
-
+ if next > now { continue; }
let job = match Job::new(worker_type, &job_id) {
Ok(job) => job,
Err(_) => continue, // could not get lock
};
-
let userid = Userid::backup_userid().clone();
+ if let Err(err) = do_verification_job(job, job_config, &userid, Some(event_str)) {
+ eprintln!("unable to start datastore verification job {} - {}", &job_id, err);
+ }
+ }
+}
- if let Err(err) = do_sync_job(job, job_config, &userid, Some(event_str)) {
- eprintln!("unable to start datastore sync job {} - {}", &job_id, err);
+async fn schedule_task_log_rotate() {
+ use proxmox_backup::{
+ config::jobstate::{self, Job},
+ server::rotate_task_log_archive,
+ };
+ use proxmox_backup::server::WorkerTask;
+ use proxmox_backup::tools::systemd::time::{
+ parse_calendar_event, compute_next_event};
+
+ let worker_type = "logrotate";
+ let job_id = "task-archive";
+
+ let last = match jobstate::last_run_time(worker_type, job_id) {
+ Ok(time) => time,
+ Err(err) => {
+ eprintln!("could not get last run time of task log archive rotation: {}", err);
+ return;
+ }
+ };
+
+ // schedule daily at 00:00 like normal logrotate
+ let schedule = "00:00";
+
+ let event = match parse_calendar_event(schedule) {
+ Ok(event) => event,
+ Err(err) => {
+ // should not happen?
+ eprintln!("unable to parse schedule '{}' - {}", schedule, err);
+ return;
}
+ };
+
+ let next = match compute_next_event(&event, last, false) {
+ Ok(Some(next)) => next,
+ Ok(None) => return,
+ Err(err) => {
+ eprintln!("compute_next_event for '{}' failed - {}", schedule, err);
+ return;
+ }
+ };
+
+ let now = proxmox::tools::time::epoch_i64();
+
+ if next > now {
+ // if we never ran the rotation, schedule instantly
+ match jobstate::JobState::load(worker_type, job_id) {
+ Ok(state) => match state {
+ jobstate::JobState::Created { .. } => {},
+ _ => return,
+ },
+ _ => return,
+ }
+ }
+
+ let mut job = match Job::new(worker_type, job_id) {
+ Ok(job) => job,
+ Err(_) => return, // could not get lock
+ };
+
+ if let Err(err) = WorkerTask::new_thread(
+ worker_type,
+ Some(job_id.to_string()),
+ Userid::backup_userid().clone(),
+ false,
+ move |worker| {
+ job.start(&worker.upid().to_string())?;
+ worker.log(format!("starting task log rotation"));
+
+ let result = try_block!({
+ // rotate task log archive
+ let max_size = 500000; // a normal entry has about 100b, so ~ 5000 entries/file
+ let max_files = 20; // times twenty files gives at least 100000 task entries
+ let has_rotated = rotate_task_log_archive(max_size, true, Some(max_files))?;
+ if has_rotated {
+ worker.log(format!("task log archive was rotated"));
+ } else {
+ worker.log(format!("task log archive was not rotated"));
+ }
+
+ Ok(())
+ });
+
+ let status = worker.create_state(&result);
+
+ if let Err(err) = job.finish(status) {
+ eprintln!("could not finish job state for {}: {}", worker_type, err);
+ }
+
+ result
+ },
+ ) {
+ eprintln!("unable to start task log rotation: {}", err);
}
+
}
async fn run_stat_generator() {