]> git.proxmox.com Git - proxmox-backup.git/blame - src/bin/proxmox-backup-proxy.rs
use new auth api crate
[proxmox-backup.git] / src / bin / proxmox-backup-proxy.rs
CommitLineData
5d74f796 1use std::path::{Path, PathBuf};
5d74f796 2use std::sync::{Arc, Mutex};
a2479cfa 3
f7d4e4b5 4use anyhow::{bail, format_err, Error};
a2479cfa 5use futures::*;
7fa9a37c
DM
6use http::request::Parts;
7use http::Response;
7fa9a37c 8use hyper::header;
5d74f796 9use hyper::{Body, StatusCode};
7fa9a37c 10use url::form_urlencoded;
ea368a06 11
5aeeb44a 12use openssl::ssl::SslAcceptor;
7fa9a37c 13use serde_json::{json, Value};
a2479cfa 14
6ef1b649 15use proxmox_lang::try_block;
4d397849 16use proxmox_metrics::MetricsData;
5aeeb44a 17use proxmox_router::{RpcEnvironment, RpcEnvironmentType};
759c4c87 18use proxmox_sys::fs::{CreateOptions, FileSystemInformation};
5aeeb44a 19use proxmox_sys::linux::procfs::{Loadavg, ProcFsMemInfo, ProcFsNetDev, ProcFsStat};
d5790a9f 20use proxmox_sys::logrotate::LogRotate;
5d74f796 21use proxmox_sys::{task_log, task_warn};
fd6d2438 22
6d5d305d 23use pbs_datastore::DataStore;
09340f28 24
48176b0a 25use proxmox_rest_server::{
5aeeb44a
WB
26 cleanup_old_tasks, cookie_from_header, rotate_task_log_archive, ApiConfig, RestEnvironment,
27 RestServer, WorkerTask,
48176b0a 28};
a2479cfa 29
98eb435d 30use proxmox_backup::rrd_cache::{
5d74f796 31 initialize_rrd_cache, rrd_sync_journal, rrd_update_derive, rrd_update_gauge,
98eb435d 32};
1298618a 33use proxmox_backup::{
1298618a 34 server::{
608806e8 35 auth::check_pbs_auth,
5d74f796 36 jobstate::{self, Job},
1298618a 37 },
759c4c87 38 tools::disks::BlockDevStat,
8e70d421 39 traffic_control_cache::{SharedRateLimit, TRAFFIC_CONTROL_CACHE},
1298618a
DM
40};
41
af06decd 42use pbs_buildcfg::configdir;
68b6c120 43use proxmox_time::CalendarEvent;
1298618a 44
89725197 45use pbs_api_types::{
6283d7d1
WB
46 Authid, DataStoreConfig, Operation, PruneJobConfig, SyncJobConfig, TapeBackupJobConfig,
47 VerificationJobConfig,
89725197 48};
e7d4be9d 49
8bca935f
DM
50use proxmox_rest_server::daemon;
51
d01e2420 52use proxmox_backup::auth_helpers::*;
5d74f796 53use proxmox_backup::server;
97168f92 54use proxmox_backup::tools::{
5d74f796 55 disks::{zfs_dataset_stats, DiskManage},
32413921 56 PROXMOX_BACKUP_TCP_KEEPALIVE_TIME,
97168f92 57};
02c7a755 58
a13573c2 59use proxmox_backup::api2::pull::do_sync_job;
8513626b 60use proxmox_backup::api2::tape::backup::do_tape_backup_job;
b8d90798 61use proxmox_backup::server::do_prune_job;
5d74f796 62use proxmox_backup::server::do_verification_job;
a13573c2 63
946c3e8a 64fn main() -> Result<(), Error> {
d91a0f9f
DM
65 pbs_tools::setup_libc_malloc_opts();
66
ac7513e3
DM
67 proxmox_backup::tools::setup_safe_path_env();
68
21211748
DM
69 let backup_uid = pbs_config::backup_user()?.uid;
70 let backup_gid = pbs_config::backup_group()?.gid;
843880f0
TL
71 let running_uid = nix::unistd::Uid::effective();
72 let running_gid = nix::unistd::Gid::effective();
73
74 if running_uid != backup_uid || running_gid != backup_gid {
5d74f796 75 bail!(
84df915e 76 "proxy not running as backup user or group (got uid {running_uid} gid {running_gid})"
5d74f796 77 );
843880f0
TL
78 }
79
9a1b24b6 80 proxmox_async::runtime::main(run())
4223d9f8
DM
81}
82
af35bc8b
TL
83/// check for a cookie with the user-preferred language, fallback to the config one if not set or
84/// not existing
85fn get_language(headers: &http::HeaderMap) -> String {
84df915e 86 let exists = |l: &str| Path::new(&format!("/usr/share/pbs-i18n/pbs-lang-{l}.js")).exists();
af35bc8b
TL
87
88 match cookie_from_header(headers, "PBSLangCookie") {
89 Some(cookie_lang) if exists(&cookie_lang) => cookie_lang,
90 _ => match proxmox_backup::config::node::config().map(|(cfg, _)| cfg.default_lang) {
91 Ok(Some(default_lang)) if exists(&default_lang) => default_lang,
92 _ => String::from(""),
93 },
48176b0a 94 }
48176b0a
DM
95}
96
5d74f796 97async fn get_index_future(env: RestEnvironment, parts: Parts) -> Response<Body> {
48176b0a
DM
98 let auth_id = env.get_auth_id();
99 let api = env.api_config();
48176b0a 100
5d74f796 101 // fixme: make all IO async
6680878b 102
7fa9a37c
DM
103 let (userid, csrf_token) = match auth_id {
104 Some(auth_id) => {
105 let auth_id = auth_id.parse::<Authid>();
106 match auth_id {
107 Ok(auth_id) if !auth_id.is_token() => {
108 let userid = auth_id.user().clone();
109 let new_csrf_token = assemble_csrf_prevention_token(csrf_secret(), &userid);
110 (Some(userid), Some(new_csrf_token))
111 }
5d74f796 112 _ => (None, None),
7fa9a37c
DM
113 }
114 }
115 None => (None, None),
116 };
117
25877d05 118 let nodename = proxmox_sys::nodename();
7fa9a37c
DM
119 let user = userid.as_ref().map(|u| u.as_str()).unwrap_or("");
120
121 let csrf_token = csrf_token.unwrap_or_else(|| String::from(""));
122
123 let mut debug = false;
124 let mut template_file = "index";
125
126 if let Some(query_str) = parts.uri.query() {
127 for (k, v) in form_urlencoded::parse(query_str.as_bytes()).into_owned() {
128 if k == "debug" && v != "0" && v != "false" {
129 debug = true;
130 } else if k == "console" {
131 template_file = "console";
132 }
133 }
134 }
135
7fa9a37c
DM
136 let data = json!({
137 "NodeName": nodename,
138 "UserName": user,
139 "CSRFPreventionToken": csrf_token,
af35bc8b 140 "language": get_language(&parts.headers),
7fa9a37c
DM
141 "debug": debug,
142 });
143
144 let (ct, index) = match api.render_template(template_file, &data) {
145 Ok(index) => ("text/html", index),
84df915e 146 Err(err) => ("text/plain", format!("Error rendering template: {err}")),
7fa9a37c
DM
147 };
148
149 let mut resp = Response::builder()
150 .status(StatusCode::OK)
151 .header(header::CONTENT_TYPE, ct)
152 .body(index.into())
153 .unwrap();
154
155 if let Some(userid) = userid {
156 resp.extensions_mut().insert(Authid::from((userid, None)));
157 }
158
159 resp
160}
161
fda5797b 162async fn run() -> Result<(), Error> {
7b944ff1
DC
163 // Note: To debug early connection error use
164 // PROXMOX_DEBUG=1 ./target/release/proxmox-backup-proxy
165 let debug = std::env::var("PROXMOX_DEBUG").is_ok();
166
02c7a755
DM
167 if let Err(err) = syslog::init(
168 syslog::Facility::LOG_DAEMON,
9531d2c5
TL
169 if debug {
170 log::LevelFilter::Debug
171 } else {
172 log::LevelFilter::Info
173 },
5d74f796
TL
174 Some("proxmox-backup-proxy"),
175 ) {
84df915e 176 bail!("unable to inititialize syslog - {err}");
02c7a755
DM
177 }
178
d97ff8ae 179 proxmox_backup::auth_helpers::setup_auth_context(false);
d01e2420 180
fa49d0fd
DM
181 let rrd_cache = initialize_rrd_cache()?;
182 rrd_cache.apply_journal()?;
183
af06decd 184 let mut indexpath = PathBuf::from(pbs_buildcfg::JS_DIR);
2ab5acac 185 indexpath.push("index.hbs");
5aeeb44a
WB
186
187 let mut config = ApiConfig::new(pbs_buildcfg::JS_DIR, RpcEnvironmentType::PUBLIC)
188 .index_handler_func(|e, p| Box::pin(get_index_future(e, p)))
189 .auth_handler_func(|h, m| Box::pin(check_pbs_auth(h, m)))
190 .register_template("index", &indexpath)?
191 .register_template("console", "/usr/share/pve-xtermjs/index.html.hbs")?
192 .default_api2_handler(&proxmox_backup::api2::ROUTER)
193 .aliases([
194 ("novnc", "/usr/share/novnc-pve"),
195 ("extjs", "/usr/share/javascript/extjs"),
196 ("qrcodejs", "/usr/share/javascript/qrcodejs"),
197 ("fontawesome", "/usr/share/fonts-font-awesome"),
198 ("xtermjs", "/usr/share/pve-xtermjs"),
199 ("locale", "/usr/share/pbs-i18n"),
200 (
201 "widgettoolkit",
202 "/usr/share/javascript/proxmox-widget-toolkit",
203 ),
204 ("docs", "/usr/share/doc/proxmox-backup/html"),
205 ]);
2ab5acac 206
fd6d2438 207 let backup_user = pbs_config::backup_user()?;
5d74f796
TL
208 let mut commando_sock = proxmox_rest_server::CommandSocket::new(
209 proxmox_rest_server::our_ctrl_sock(),
210 backup_user.gid,
211 );
fd6d2438 212
5d74f796
TL
213 let dir_opts = CreateOptions::new()
214 .owner(backup_user.uid)
215 .group(backup_user.gid);
216 let file_opts = CreateOptions::new()
217 .owner(backup_user.uid)
218 .group(backup_user.gid);
a68768cf 219
5aeeb44a
WB
220 config = config
221 .enable_access_log(
222 pbs_buildcfg::API_ACCESS_LOG_FN,
223 Some(dir_opts.clone()),
224 Some(file_opts.clone()),
225 &mut commando_sock,
226 )?
227 .enable_auth_log(
228 pbs_buildcfg::API_AUTH_LOG_FN,
229 Some(dir_opts.clone()),
230 Some(file_opts.clone()),
231 &mut commando_sock,
232 )?;
8e7e2223 233
02c7a755 234 let rest_server = RestServer::new(config);
5d74f796
TL
235 proxmox_rest_server::init_worker_tasks(
236 pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR_M!().into(),
237 file_opts.clone(),
238 )?;
02c7a755 239
6d1f61b2 240 //openssl req -x509 -newkey rsa:4096 -keyout /etc/proxmox-backup/proxy.key -out /etc/proxmox-backup/proxy.pem -nodes
6d1f61b2 241
4ce7da51 242 // we build the initial acceptor here as we cannot start if this fails
c381a162 243 let acceptor = make_tls_acceptor()?;
4ce7da51 244 let acceptor = Arc::new(Mutex::new(acceptor));
6d1f61b2 245
4ce7da51 246 // to renew the acceptor we just add a command-socket handler
5d74f796
TL
247 commando_sock.register_command("reload-certificate".to_string(), {
248 let acceptor = Arc::clone(&acceptor);
249 move |_value| -> Result<_, Error> {
250 log::info!("reloading certificate");
251 match make_tls_acceptor() {
84df915e 252 Err(err) => log::error!("error reloading certificate: {err}"),
5d74f796
TL
253 Ok(new_acceptor) => {
254 let mut guard = acceptor.lock().unwrap();
255 *guard = new_acceptor;
4ce7da51 256 }
062cf75c
DC
257 }
258 Ok(Value::Null)
259 }
5d74f796 260 })?;
062cf75c 261
5d74f796
TL
262 // to remove references for not configured datastores
263 commando_sock.register_command("datastore-removed".to_string(), |_value| {
264 if let Err(err) = DataStore::remove_unused_datastores() {
84df915e 265 log::error!("could not refresh datastores: {err}");
5d74f796
TL
266 }
267 Ok(Value::Null)
268 })?;
97168f92 269
5aeeb44a
WB
270 let connections = proxmox_rest_server::connection::AcceptBuilder::with_acceptor(acceptor)
271 .debug(debug)
272 .rate_limiter_lookup(Arc::new(lookup_rate_limiter))
273 .tcp_keepalive_time(PROXMOX_BACKUP_TCP_KEEPALIVE_TIME);
e9b9f33a
FE
274 let server = daemon::create_daemon(
275 ([0, 0, 0, 0, 0, 0, 0, 0], 8007).into(),
276 move |listener| {
5aeeb44a 277 let connections = connections.accept(listener);
e9b9f33a
FE
278
279 Ok(async {
280 daemon::systemd_notify(daemon::SystemdNotify::Ready)?;
281
282 hyper::Server::builder(connections)
283 .serve(rest_server)
284 .with_graceful_shutdown(proxmox_rest_server::shutdown_future())
285 .map_err(Error::from)
286 .await
287 })
288 },
289 Some(pbs_buildcfg::PROXMOX_BACKUP_PROXY_PID_FN),
290 );
a2ca7137 291
b9700a9f 292 proxmox_rest_server::write_pid(pbs_buildcfg::PROXMOX_BACKUP_PROXY_PID_FN)?;
d98c9a7a 293
fda5797b 294 let init_result: Result<(), Error> = try_block!({
b9700a9f 295 proxmox_rest_server::register_task_control_commands(&mut commando_sock)?;
a68768cf 296 commando_sock.spawn()?;
fd1b65cc
DM
297 proxmox_rest_server::catch_shutdown_signal()?;
298 proxmox_rest_server::catch_reload_signal()?;
fda5797b
WB
299 Ok(())
300 });
d607b886 301
fda5797b 302 if let Err(err) = init_result {
84df915e 303 bail!("unable to start daemon - {err}");
fda5797b 304 }
e3f41f21 305
c2206e21
TL
306 // stop gap for https://github.com/tokio-rs/tokio/issues/4730 where the thread holding the
307 // IO-driver may block progress completely if it starts polling its own tasks (blocks).
308 // So, trigger a notify to parked threads, as we're immediately ready the woken up thread will
309 // acquire the IO driver, if blocked, before going to sleep, which allows progress again
310 // TODO: remove once tokio solves this at their level (see proposals in linked comments)
311 let rt_handle = tokio::runtime::Handle::current();
312 std::thread::spawn(move || loop {
313 rt_handle.spawn(std::future::ready(()));
314 std::thread::sleep(Duration::from_secs(3));
315 });
316
8545480a 317 start_task_scheduler();
eaeda365 318 start_stat_generator();
a0172d76 319 start_traffic_control_updater();
8545480a 320
083ff3fd 321 server.await?;
a546a8a0 322 log::info!("server shutting down, waiting for active workers to complete");
fd6d2438 323 proxmox_rest_server::last_worker_future().await?;
fda5797b 324 log::info!("done - exit server");
e3f41f21 325
4223d9f8 326 Ok(())
02c7a755 327}
8545480a 328
4ce7da51 329fn make_tls_acceptor() -> Result<SslAcceptor, Error> {
c381a162
WB
330 let key_path = configdir!("/proxy.key");
331 let cert_path = configdir!("/proxy.pem");
332
2eba3967 333 let (config, _) = proxmox_backup::config::node::config()?;
5ee8dd78
FG
334 let ciphers_tls_1_3 = config.ciphers_tls_1_3;
335 let ciphers_tls_1_2 = config.ciphers_tls_1_2;
2eba3967 336
5aeeb44a
WB
337 let mut acceptor = proxmox_rest_server::connection::TlsAcceptorBuilder::new()
338 .certificate_paths_pem(key_path, cert_path);
339
340 //let mut acceptor = SslAcceptor::mozilla_intermediate_v5(SslMethod::tls()).unwrap();
5ee8dd78 341 if let Some(ciphers) = ciphers_tls_1_3.as_deref() {
5aeeb44a 342 acceptor = acceptor.cipher_suites(ciphers.to_string());
2eba3967 343 }
5ee8dd78 344 if let Some(ciphers) = ciphers_tls_1_2.as_deref() {
5aeeb44a 345 acceptor = acceptor.cipher_list(ciphers.to_string());
2eba3967 346 }
a5e3be49 347
5aeeb44a 348 acceptor.build()
48aa2b93
DM
349}
350
eaeda365 351fn start_stat_generator() {
fd6d2438 352 let abort_future = proxmox_rest_server::shutdown_future();
eaeda365
DM
353 let future = Box::pin(run_stat_generator());
354 let task = futures::future::select(future, abort_future);
355 tokio::spawn(task.map(|_| ()));
356}
357
8545480a 358fn start_task_scheduler() {
fd6d2438 359 let abort_future = proxmox_rest_server::shutdown_future();
8545480a
DM
360 let future = Box::pin(run_task_scheduler());
361 let task = futures::future::select(future, abort_future);
362 tokio::spawn(task.map(|_| ()));
363}
364
a0172d76
DM
365fn start_traffic_control_updater() {
366 let abort_future = proxmox_rest_server::shutdown_future();
367 let future = Box::pin(run_traffic_control_updater());
368 let task = futures::future::select(future, abort_future);
369 tokio::spawn(task.map(|_| ()));
370}
371
5d74f796 372use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
8545480a 373
37f9b8cc 374fn next_minute() -> Instant {
6a7be83e 375 let now = SystemTime::now();
37f9b8cc
TL
376 let epoch_now = match now.duration_since(UNIX_EPOCH) {
377 Ok(d) => d,
378 Err(err) => {
379 eprintln!("task scheduler: compute next minute alignment failed - {err}");
380 return Instant::now() + Duration::from_secs(60);
381 }
382 };
5d74f796 383 let epoch_next = Duration::from_secs((epoch_now.as_secs() / 60 + 1) * 60);
37f9b8cc 384 Instant::now() + epoch_next - epoch_now
8545480a
DM
385}
386
387async fn run_task_scheduler() {
8545480a 388 loop {
3f6a17b0 389 // sleep first to align to next minute boundary for first round
37f9b8cc 390 let delay_target = next_minute();
3f6a17b0 391 tokio::time::sleep_until(tokio::time::Instant::from_std(delay_target)).await;
8545480a 392
3f6a17b0
TL
393 match schedule_tasks().catch_unwind().await {
394 Err(panic) => match panic.downcast::<&str>() {
395 Ok(msg) => eprintln!("task scheduler panic: {msg}"),
396 Err(_) => eprintln!("task scheduler panic - unknown type"),
397 },
398 Ok(Err(err)) => eprintln!("task scheduler failed - {err:?}"),
399 Ok(Ok(_)) => {}
8545480a 400 }
8545480a
DM
401 }
402}
403
404async fn schedule_tasks() -> Result<(), Error> {
8545480a 405 schedule_datastore_garbage_collection().await;
dba37e21 406 schedule_datastore_prune_jobs().await;
a6160cdf 407 schedule_datastore_sync_jobs().await;
73df9c51 408 schedule_datastore_verify_jobs().await;
8513626b 409 schedule_tape_backup_jobs().await;
9a760917 410 schedule_task_log_rotate().await;
8545480a
DM
411
412 Ok(())
413}
414
8545480a 415async fn schedule_datastore_garbage_collection() {
e7d4be9d 416 let config = match pbs_config::datastore::config() {
8545480a 417 Err(err) => {
84df915e 418 eprintln!("unable to read datastore config - {err}");
8545480a
DM
419 return;
420 }
421 Ok((config, _digest)) => config,
422 };
423
424 for (store, (_, store_config)) in config.sections {
25829a87 425 let store_config: DataStoreConfig = match serde_json::from_value(store_config) {
8545480a
DM
426 Ok(c) => c,
427 Err(err) => {
84df915e 428 eprintln!("datastore config from_value failed - {err}");
8545480a
DM
429 continue;
430 }
431 };
432
433 let event_str = match store_config.gc_schedule {
434 Some(event_str) => event_str,
435 None => continue,
436 };
437
68b6c120 438 let event: CalendarEvent = match event_str.parse() {
8545480a
DM
439 Ok(event) => event,
440 Err(err) => {
84df915e 441 eprintln!("unable to parse schedule '{event_str}' - {err}");
8545480a
DM
442 continue;
443 }
444 };
445
42fb291c
TL
446 {
447 // limit datastore scope due to Op::Lookup
e22ad283
TL
448 let datastore = match DataStore::lookup_datastore(&store, Some(Operation::Lookup)) {
449 Ok(datastore) => datastore,
450 Err(err) => {
84df915e 451 eprintln!("lookup_datastore failed - {err}");
e22ad283
TL
452 continue;
453 }
454 };
455
456 if datastore.garbage_collection_running() {
457 continue;
458 }
5d74f796 459 }
8545480a
DM
460
461 let worker_type = "garbage_collection";
462
b6ba5acd
DC
463 let last = match jobstate::last_run_time(worker_type, &store) {
464 Ok(time) => time,
465 Err(err) => {
e22ad283 466 eprintln!("could not get last run time of {worker_type} {store}: {err}");
b6ba5acd 467 continue;
8545480a
DM
468 }
469 };
470
7549114c 471 let next = match event.compute_next_event(last) {
15ec790a
DC
472 Ok(Some(next)) => next,
473 Ok(None) => continue,
8545480a 474 Err(err) => {
84df915e 475 eprintln!("compute_next_event for '{event_str}' failed - {err}");
8545480a
DM
476 continue;
477 }
478 };
e693818a 479
6ef1b649 480 let now = proxmox_time::epoch_i64();
6a7be83e 481
5d74f796
TL
482 if next > now {
483 continue;
484 }
8545480a 485
1cd951c9 486 let job = match Job::new(worker_type, &store) {
d7a122a0
DC
487 Ok(job) => job,
488 Err(_) => continue, // could not get lock
489 };
490
e22ad283
TL
491 let datastore = match DataStore::lookup_datastore(&store, Some(Operation::Write)) {
492 Ok(datastore) => datastore,
493 Err(err) => {
494 log::warn!("skipping scheduled GC on {store}, could look it up - {err}");
495 continue;
496 }
497 };
498
ad54df31 499 let auth_id = Authid::root_auth_id();
d7a122a0 500
5d74f796
TL
501 if let Err(err) = crate::server::do_garbage_collection_job(
502 job,
503 datastore,
504 auth_id,
505 Some(event_str),
506 false,
507 ) {
84df915e 508 eprintln!("unable to start garbage collection job on datastore {store} - {err}");
8545480a
DM
509 }
510 }
511}
25829a87 512
dba37e21
WB
513async fn schedule_datastore_prune_jobs() {
514 let config = match pbs_config::prune::config() {
515 Err(err) => {
84df915e 516 eprintln!("unable to read prune job config - {err}");
dba37e21
WB
517 return;
518 }
519 Ok((config, _digest)) => config,
520 };
521 for (job_id, (_, job_config)) in config.sections {
522 let job_config: PruneJobConfig = match serde_json::from_value(job_config) {
523 Ok(c) => c,
524 Err(err) => {
84df915e 525 eprintln!("prune job config from_value failed - {err}");
dba37e21
WB
526 continue;
527 }
528 };
529
530 if job_config.disable {
531 continue;
532 }
533
534 if !job_config.options.keeps_something() {
9f7752f2 535 continue; // no 'keep' values set, keep all
dba37e21
WB
536 }
537
538 let worker_type = "prunejob";
539 let auth_id = Authid::root_auth_id().clone();
540 if check_schedule(worker_type, &job_config.schedule, &job_id) {
541 let job = match Job::new(worker_type, &job_id) {
542 Ok(job) => job,
543 Err(_) => continue, // could not get lock
544 };
545 if let Err(err) = do_prune_job(
546 job,
547 job_config.options,
548 job_config.store,
549 &auth_id,
550 Some(job_config.schedule),
551 ) {
84df915e 552 eprintln!("unable to start datastore prune job {job_id} - {err}");
dba37e21
WB
553 }
554 };
555 }
556}
557
a6160cdf 558async fn schedule_datastore_sync_jobs() {
a4e5a0fc 559 let config = match pbs_config::sync::config() {
a6160cdf 560 Err(err) => {
84df915e 561 eprintln!("unable to read sync job config - {err}");
a6160cdf
DM
562 return;
563 }
564 Ok((config, _digest)) => config,
565 };
566
a6160cdf
DM
567 for (job_id, (_, job_config)) in config.sections {
568 let job_config: SyncJobConfig = match serde_json::from_value(job_config) {
569 Ok(c) => c,
570 Err(err) => {
84df915e 571 eprintln!("sync job config from_value failed - {err}");
a6160cdf
DM
572 continue;
573 }
574 };
575
576 let event_str = match job_config.schedule {
577 Some(ref event_str) => event_str.clone(),
578 None => continue,
579 };
580
c67b1fa7 581 let worker_type = "syncjob";
b15751bf 582 if check_schedule(worker_type, &event_str, &job_id) {
82c05b41
HL
583 let job = match Job::new(worker_type, &job_id) {
584 Ok(job) => job,
585 Err(_) => continue, // could not get lock
586 };
a6160cdf 587
ad54df31 588 let auth_id = Authid::root_auth_id().clone();
bfa942c0 589 if let Err(err) = do_sync_job(job, job_config, &auth_id, Some(event_str), false) {
84df915e 590 eprintln!("unable to start datastore sync job {job_id} - {err}");
a6160cdf
DM
591 }
592 };
a6160cdf
DM
593 }
594}
eaeda365 595
73df9c51 596async fn schedule_datastore_verify_jobs() {
802189f7 597 let config = match pbs_config::verify::config() {
73df9c51 598 Err(err) => {
84df915e 599 eprintln!("unable to read verification job config - {err}");
73df9c51
HL
600 return;
601 }
602 Ok((config, _digest)) => config,
603 };
604 for (job_id, (_, job_config)) in config.sections {
605 let job_config: VerificationJobConfig = match serde_json::from_value(job_config) {
606 Ok(c) => c,
607 Err(err) => {
84df915e 608 eprintln!("verification job config from_value failed - {err}");
73df9c51
HL
609 continue;
610 }
611 };
612 let event_str = match job_config.schedule {
613 Some(ref event_str) => event_str.clone(),
614 None => continue,
615 };
82c05b41 616
73df9c51 617 let worker_type = "verificationjob";
ad54df31 618 let auth_id = Authid::root_auth_id().clone();
b15751bf 619 if check_schedule(worker_type, &event_str, &job_id) {
9a37bd6c 620 let job = match Job::new(worker_type, &job_id) {
82c05b41
HL
621 Ok(job) => job,
622 Err(_) => continue, // could not get lock
623 };
5d74f796
TL
624 if let Err(err) = do_verification_job(job, job_config, &auth_id, Some(event_str), false)
625 {
84df915e 626 eprintln!("unable to start datastore verification job {job_id} - {err}");
73df9c51
HL
627 }
628 };
73df9c51
HL
629 }
630}
631
8513626b 632async fn schedule_tape_backup_jobs() {
e3619d41 633 let config = match pbs_config::tape_job::config() {
8513626b 634 Err(err) => {
84df915e 635 eprintln!("unable to read tape job config - {err}");
8513626b
DM
636 return;
637 }
638 Ok((config, _digest)) => config,
639 };
640 for (job_id, (_, job_config)) in config.sections {
641 let job_config: TapeBackupJobConfig = match serde_json::from_value(job_config) {
642 Ok(c) => c,
643 Err(err) => {
84df915e 644 eprintln!("tape backup job config from_value failed - {err}");
8513626b
DM
645 continue;
646 }
647 };
648 let event_str = match job_config.schedule {
649 Some(ref event_str) => event_str.clone(),
650 None => continue,
651 };
652
653 let worker_type = "tape-backup-job";
654 let auth_id = Authid::root_auth_id().clone();
655 if check_schedule(worker_type, &event_str, &job_id) {
9a37bd6c 656 let job = match Job::new(worker_type, &job_id) {
8513626b
DM
657 Ok(job) => job,
658 Err(_) => continue, // could not get lock
659 };
5d74f796
TL
660 if let Err(err) =
661 do_tape_backup_job(job, job_config.setup, &auth_id, Some(event_str), false)
662 {
84df915e 663 eprintln!("unable to start tape backup job {job_id} - {err}");
8513626b
DM
664 }
665 };
666 }
667}
668
9a760917 669async fn schedule_task_log_rotate() {
9a760917 670 let worker_type = "logrotate";
72aa1834 671 let job_id = "access-log_and_task-archive";
9a760917 672
9a760917
DC
673 // schedule daily at 00:00 like normal logrotate
674 let schedule = "00:00";
675
b15751bf 676 if !check_schedule(worker_type, schedule, job_id) {
9a760917
DC
677 // if we never ran the rotation, schedule instantly
678 match jobstate::JobState::load(worker_type, job_id) {
fbfb64a6 679 Ok(jobstate::JobState::Created { .. }) => {}
9a760917
DC
680 _ => return,
681 }
682 }
683
684 let mut job = match Job::new(worker_type, job_id) {
685 Ok(job) => job,
686 Err(_) => return, // could not get lock
687 };
688
689 if let Err(err) = WorkerTask::new_thread(
690 worker_type,
72aa1834 691 None,
049a22a3 692 Authid::root_auth_id().to_string(),
9a760917
DC
693 false,
694 move |worker| {
695 job.start(&worker.upid().to_string())?;
1ec0d70d 696 task_log!(worker, "starting task log rotation");
e4f5f59e 697
9a760917 698 let result = try_block!({
b7f2be51
TL
699 let max_size = 512 * 1024 - 1; // an entry has ~ 100b, so > 5000 entries/file
700 let max_files = 20; // times twenty files gives > 100000 task entries
d5790a9f 701
416194d7
DC
702 let max_days = proxmox_backup::config::node::config()
703 .map(|(cfg, _)| cfg.task_log_max_days)
704 .ok()
705 .flatten();
706
d5790a9f 707 let user = pbs_config::backup_user()?;
25877d05 708 let options = proxmox_sys::fs::CreateOptions::new()
d5790a9f
DM
709 .owner(user.uid)
710 .group(user.gid);
711
712 let has_rotated = rotate_task_log_archive(
713 max_size,
714 true,
715 Some(max_files),
416194d7 716 max_days,
d5790a9f
DM
717 Some(options.clone()),
718 )?;
719
9a760917 720 if has_rotated {
1ec0d70d 721 task_log!(worker, "task log archive was rotated");
9a760917 722 } else {
1ec0d70d 723 task_log!(worker, "task log archive was not rotated");
9a760917
DC
724 }
725
fe4cc5b1
TL
726 let max_size = 32 * 1024 * 1024 - 1;
727 let max_files = 14;
fe4cc5b1 728
d5790a9f
DM
729 let mut logrotate = LogRotate::new(
730 pbs_buildcfg::API_ACCESS_LOG_FN,
731 true,
732 Some(max_files),
733 Some(options.clone()),
734 )?;
735
736 if logrotate.rotate(max_size)? {
fe4cc5b1 737 println!("rotated access log, telling daemons to re-open log file");
9a1b24b6 738 proxmox_async::runtime::block_on(command_reopen_access_logfiles())?;
1ec0d70d 739 task_log!(worker, "API access log was rotated");
fe7bdc9d 740 } else {
1ec0d70d 741 task_log!(worker, "API access log was not rotated");
fe7bdc9d
TL
742 }
743
d5790a9f
DM
744 let mut logrotate = LogRotate::new(
745 pbs_buildcfg::API_AUTH_LOG_FN,
746 true,
747 Some(max_files),
748 Some(options),
749 )?;
fe4cc5b1 750
d5790a9f 751 if logrotate.rotate(max_size)? {
36b7085e 752 println!("rotated auth log, telling daemons to re-open log file");
9a1b24b6 753 proxmox_async::runtime::block_on(command_reopen_auth_logfiles())?;
1ec0d70d 754 task_log!(worker, "API authentication log was rotated");
fe4cc5b1 755 } else {
1ec0d70d 756 task_log!(worker, "API authentication log was not rotated");
fe4cc5b1
TL
757 }
758
0e1edf19
DC
759 if has_rotated {
760 task_log!(worker, "cleaning up old task logs");
baefc295 761 if let Err(err) = cleanup_old_tasks(&worker, true) {
84df915e 762 task_warn!(worker, "could not completely cleanup old tasks: {err}");
0e1edf19
DC
763 }
764 }
765
9a760917
DC
766 Ok(())
767 });
768
769 let status = worker.create_state(&result);
770
771 if let Err(err) = job.finish(status) {
84df915e 772 eprintln!("could not finish job state for {worker_type}: {err}");
9a760917
DC
773 }
774
775 result
776 },
777 ) {
84df915e 778 eprintln!("unable to start task log rotation: {err}");
9a760917 779 }
9a760917
DC
780}
781
36b7085e 782async fn command_reopen_access_logfiles() -> Result<(), Error> {
fe4cc5b1
TL
783 // only care about the most recent daemon instance for each, proxy & api, as other older ones
784 // should not respond to new requests anyway, but only finish their current one and then exit.
b9700a9f 785 let sock = proxmox_rest_server::our_ctrl_sock();
5d74f796
TL
786 let f1 =
787 proxmox_rest_server::send_raw_command(sock, "{\"command\":\"api-access-log-reopen\"}\n");
fe4cc5b1 788
b9700a9f
DM
789 let pid = proxmox_rest_server::read_pid(pbs_buildcfg::PROXMOX_BACKUP_API_PID_FN)?;
790 let sock = proxmox_rest_server::ctrl_sock_from_pid(pid);
5d74f796
TL
791 let f2 =
792 proxmox_rest_server::send_raw_command(sock, "{\"command\":\"api-access-log-reopen\"}\n");
546b6a23
TL
793
794 match futures::join!(f1, f2) {
5d74f796 795 (Err(e1), Err(e2)) => Err(format_err!(
84df915e 796 "reopen commands failed, proxy: {e1}; api: {e2}"
5d74f796 797 )),
84df915e
TL
798 (Err(e1), Ok(_)) => Err(format_err!("reopen commands failed, proxy: {e1}")),
799 (Ok(_), Err(e2)) => Err(format_err!("reopen commands failed, api: {e2}")),
36b7085e
DM
800 _ => Ok(()),
801 }
802}
803
804async fn command_reopen_auth_logfiles() -> Result<(), Error> {
805 // only care about the most recent daemon instance for each, proxy & api, as other older ones
806 // should not respond to new requests anyway, but only finish their current one and then exit.
b9700a9f 807 let sock = proxmox_rest_server::our_ctrl_sock();
75442e81 808 let f1 = proxmox_rest_server::send_raw_command(sock, "{\"command\":\"api-auth-log-reopen\"}\n");
36b7085e 809
b9700a9f
DM
810 let pid = proxmox_rest_server::read_pid(pbs_buildcfg::PROXMOX_BACKUP_API_PID_FN)?;
811 let sock = proxmox_rest_server::ctrl_sock_from_pid(pid);
75442e81 812 let f2 = proxmox_rest_server::send_raw_command(sock, "{\"command\":\"api-auth-log-reopen\"}\n");
36b7085e
DM
813
814 match futures::join!(f1, f2) {
5d74f796 815 (Err(e1), Err(e2)) => Err(format_err!(
84df915e 816 "reopen commands failed, proxy: {e1}; api: {e2}"
5d74f796 817 )),
84df915e
TL
818 (Err(e1), Ok(_)) => Err(format_err!("reopen commands failed, proxy: {e1}")),
819 (Ok(_), Err(e2)) => Err(format_err!("reopen commands failed, api: {e2}")),
546b6a23
TL
820 _ => Ok(()),
821 }
fe4cc5b1
TL
822}
823
eaeda365 824async fn run_stat_generator() {
eaeda365 825 loop {
5d74f796 826 let delay_target = Instant::now() + Duration::from_secs(10);
eaeda365 827
759c4c87
DC
828 let stats = match tokio::task::spawn_blocking(|| {
829 let hoststats = collect_host_stats_sync();
830 let (hostdisk, datastores) = collect_disk_stats_sync();
831 Arc::new((hoststats, hostdisk, datastores))
832 })
833 .await
834 {
835 Ok(res) => res,
836 Err(err) => {
84df915e 837 log::error!("collecting host stats panicked: {err}");
759c4c87
DC
838 tokio::time::sleep_until(tokio::time::Instant::from_std(delay_target)).await;
839 continue;
840 }
841 };
eaeda365 842
4d397849
DC
843 let rrd_future = tokio::task::spawn_blocking({
844 let stats = Arc::clone(&stats);
845 move || {
846 rrd_update_host_stats_sync(&stats.0, &stats.1, &stats.2);
847 rrd_sync_journal();
848 }
849 });
850
851 let metrics_future = send_data_to_metric_servers(stats);
852
853 let (rrd_res, metrics_res) = join!(rrd_future, metrics_future);
854 if let Err(err) = rrd_res {
84df915e 855 log::error!("rrd update panicked: {err}");
4d397849
DC
856 }
857 if let Err(err) = metrics_res {
84df915e 858 log::error!("error during metrics sending: {err}");
759c4c87 859 }
98eb435d 860
0a8d773a 861 tokio::time::sleep_until(tokio::time::Instant::from_std(delay_target)).await;
5d74f796 862 }
eaeda365
DM
863}
864
4d397849
DC
865async fn send_data_to_metric_servers(
866 stats: Arc<(HostStats, DiskStat, Vec<DiskStat>)>,
867) -> Result<(), Error> {
868 let (config, _digest) = pbs_config::metrics::config()?;
869 let channel_list = get_metric_server_connections(config)?;
870
871 if channel_list.is_empty() {
872 return Ok(());
873 }
874
875 let ctime = proxmox_time::epoch_i64();
876 let nodename = proxmox_sys::nodename();
877
878 let mut values = Vec::new();
879
880 let mut cpuvalue = match &stats.0.proc {
881 Some(stat) => serde_json::to_value(stat)?,
882 None => json!({}),
883 };
884
885 if let Some(loadavg) = &stats.0.load {
886 cpuvalue["avg1"] = Value::from(loadavg.0);
887 cpuvalue["avg5"] = Value::from(loadavg.1);
888 cpuvalue["avg15"] = Value::from(loadavg.2);
889 }
890
891 values.push(Arc::new(
892 MetricsData::new("cpustat", ctime, cpuvalue)?
893 .tag("object", "host")
894 .tag("host", nodename),
895 ));
896
897 if let Some(stat) = &stats.0.meminfo {
898 values.push(Arc::new(
899 MetricsData::new("memory", ctime, stat)?
900 .tag("object", "host")
901 .tag("host", nodename),
902 ));
903 }
904
905 if let Some(netdev) = &stats.0.net {
906 for item in netdev {
907 values.push(Arc::new(
908 MetricsData::new("nics", ctime, item)?
909 .tag("object", "host")
910 .tag("host", nodename)
911 .tag("instance", item.device.clone()),
912 ));
913 }
914 }
915
916 values.push(Arc::new(
917 MetricsData::new("blockstat", ctime, stats.1.to_value())?
918 .tag("object", "host")
919 .tag("host", nodename),
920 ));
921
922 for datastore in stats.2.iter() {
923 values.push(Arc::new(
924 MetricsData::new("blockstat", ctime, datastore.to_value())?
925 .tag("object", "host")
926 .tag("host", nodename)
927 .tag("datastore", datastore.name.clone()),
928 ));
929 }
930
931 // we must have a concrete functions, because the inferred lifetime from a
932 // closure is not general enough for the tokio::spawn call we are in here...
933 fn map_fn(item: &(proxmox_metrics::Metrics, String)) -> &proxmox_metrics::Metrics {
934 &item.0
935 }
936
937 let results =
938 proxmox_metrics::send_data_to_channels(&values, channel_list.iter().map(map_fn)).await;
939 for (res, name) in results
940 .into_iter()
941 .zip(channel_list.iter().map(|(_, name)| name))
942 {
943 if let Err(err) = res {
84df915e 944 log::error!("error sending into channel of {name}: {err}");
4d397849
DC
945 }
946 }
947
948 futures::future::join_all(channel_list.into_iter().map(|(channel, name)| async move {
949 if let Err(err) = channel.join().await {
84df915e 950 log::error!("error sending to metric server {name}: {err}");
4d397849
DC
951 }
952 }))
953 .await;
954
955 Ok(())
956}
957
d8421899
WB
958/// Get the metric server connections from a config
959pub fn get_metric_server_connections(
960 metric_config: proxmox_section_config::SectionConfigData,
961) -> Result<Vec<(proxmox_metrics::Metrics, String)>, Error> {
962 let mut res = Vec::new();
963
964 for config in
965 metric_config.convert_to_typed_array::<pbs_api_types::InfluxDbUdp>("influxdb-udp")?
966 {
967 if !config.enable {
968 continue;
969 }
970 let future = proxmox_metrics::influxdb_udp(&config.host, config.mtu);
971 res.push((future, config.name));
972 }
973
974 for config in
975 metric_config.convert_to_typed_array::<pbs_api_types::InfluxDbHttp>("influxdb-http")?
976 {
977 if !config.enable {
978 continue;
979 }
980 let future = proxmox_metrics::influxdb_http(
981 &config.url,
982 config.organization.as_deref().unwrap_or("proxmox"),
983 config.bucket.as_deref().unwrap_or("proxmox"),
984 config.token.as_deref(),
985 config.verify_tls.unwrap_or(true),
986 config.max_body_size.unwrap_or(25_000_000),
987 )?;
988 res.push((future, config.name));
989 }
990 Ok(res)
991}
992
759c4c87
DC
993struct HostStats {
994 proc: Option<ProcFsStat>,
995 meminfo: Option<ProcFsMemInfo>,
996 net: Option<Vec<ProcFsNetDev>>,
997 load: Option<Loadavg>,
998}
999
1000struct DiskStat {
1001 name: String,
1002 usage: Option<FileSystemInformation>,
1003 dev: Option<BlockDevStat>,
4b709ade
DM
1004}
1005
4d397849
DC
1006impl DiskStat {
1007 fn to_value(&self) -> Value {
1008 let mut value = json!({});
1009 if let Some(usage) = &self.usage {
1010 value["total"] = Value::from(usage.total);
1011 value["used"] = Value::from(usage.used);
1012 value["avail"] = Value::from(usage.available);
1013 }
1014
1015 if let Some(dev) = &self.dev {
1016 value["read_ios"] = Value::from(dev.read_ios);
1017 value["read_bytes"] = Value::from(dev.read_sectors * 512);
1018 value["write_ios"] = Value::from(dev.write_ios);
1019 value["write_bytes"] = Value::from(dev.write_sectors * 512);
1020 value["io_ticks"] = Value::from(dev.io_ticks / 1000);
1021 }
1022 value
1023 }
1024}
1025
759c4c87 1026fn collect_host_stats_sync() -> HostStats {
25877d05 1027 use proxmox_sys::linux::procfs::{
5d74f796
TL
1028 read_loadavg, read_meminfo, read_proc_net_dev, read_proc_stat,
1029 };
eaeda365 1030
759c4c87
DC
1031 let proc = match read_proc_stat() {
1032 Ok(stat) => Some(stat),
4b709ade 1033 Err(err) => {
84df915e 1034 eprintln!("read_proc_stat failed - {err}");
759c4c87 1035 None
4b709ade 1036 }
759c4c87 1037 };
2c66a590 1038
759c4c87
DC
1039 let meminfo = match read_meminfo() {
1040 Ok(stat) => Some(stat),
4b709ade 1041 Err(err) => {
84df915e 1042 eprintln!("read_meminfo failed - {err}");
759c4c87 1043 None
a4a3f7ca 1044 }
759c4c87 1045 };
8f0cec26 1046
759c4c87
DC
1047 let net = match read_proc_net_dev() {
1048 Ok(netdev) => Some(netdev),
4b709ade 1049 Err(err) => {
84df915e 1050 eprintln!("read_prox_net_dev failed - {err}");
759c4c87 1051 None
8f0cec26 1052 }
759c4c87 1053 };
dd15c0aa 1054
759c4c87
DC
1055 let load = match read_loadavg() {
1056 Ok(loadavg) => Some(loadavg),
4b709ade 1057 Err(err) => {
84df915e 1058 eprintln!("read_loadavg failed - {err}");
759c4c87 1059 None
485841da 1060 }
759c4c87
DC
1061 };
1062
1063 HostStats {
1064 proc,
1065 meminfo,
1066 net,
1067 load,
4b709ade 1068 }
759c4c87 1069}
485841da 1070
759c4c87 1071fn collect_disk_stats_sync() -> (DiskStat, Vec<DiskStat>) {
4b709ade 1072 let disk_manager = DiskManage::new();
8c03041a 1073
759c4c87 1074 let root = gather_disk_stats(disk_manager.clone(), Path::new("/"), "host");
91e5bb49 1075
759c4c87 1076 let mut datastores = Vec::new();
4b709ade
DM
1077 match pbs_config::datastore::config() {
1078 Ok((config, _)) => {
5d74f796
TL
1079 let datastore_list: Vec<DataStoreConfig> = config
1080 .convert_to_typed_array("datastore")
1081 .unwrap_or_default();
d0833a70 1082
4b709ade 1083 for config in datastore_list {
d4d730e5
HL
1084 if config
1085 .get_maintenance_mode()
1086 .map_or(false, |mode| mode.check(Some(Operation::Read)).is_err())
1087 {
1088 continue;
1089 }
4b709ade 1090 let path = std::path::Path::new(&config.path);
759c4c87 1091 datastores.push(gather_disk_stats(disk_manager.clone(), path, &config.name));
d0833a70
DM
1092 }
1093 }
4b709ade 1094 Err(err) => {
84df915e 1095 eprintln!("read datastore config failed - {err}");
4b709ade
DM
1096 }
1097 }
759c4c87
DC
1098
1099 (root, datastores)
1100}
1101
1102fn rrd_update_host_stats_sync(host: &HostStats, hostdisk: &DiskStat, datastores: &[DiskStat]) {
1103 if let Some(stat) = &host.proc {
1104 rrd_update_gauge("host/cpu", stat.cpu);
1105 rrd_update_gauge("host/iowait", stat.iowait_percent);
1106 }
1107
1108 if let Some(meminfo) = &host.meminfo {
1109 rrd_update_gauge("host/memtotal", meminfo.memtotal as f64);
1110 rrd_update_gauge("host/memused", meminfo.memused as f64);
1111 rrd_update_gauge("host/swaptotal", meminfo.swaptotal as f64);
1112 rrd_update_gauge("host/swapused", meminfo.swapused as f64);
1113 }
1114
1115 if let Some(netdev) = &host.net {
1116 use pbs_config::network::is_physical_nic;
1117 let mut netin = 0;
1118 let mut netout = 0;
1119 for item in netdev {
1120 if !is_physical_nic(&item.device) {
1121 continue;
1122 }
1123 netin += item.receive;
1124 netout += item.send;
1125 }
1126 rrd_update_derive("host/netin", netin as f64);
1127 rrd_update_derive("host/netout", netout as f64);
1128 }
1129
1130 if let Some(loadavg) = &host.load {
1131 rrd_update_gauge("host/loadavg", loadavg.0 as f64);
1132 }
1133
1134 rrd_update_disk_stat(hostdisk, "host");
1135
1136 for stat in datastores {
1137 let rrd_prefix = format!("datastore/{}", stat.name);
1138 rrd_update_disk_stat(stat, &rrd_prefix);
1139 }
1140}
1141
1142fn rrd_update_disk_stat(disk: &DiskStat, rrd_prefix: &str) {
1143 if let Some(status) = &disk.usage {
1144 let rrd_key = format!("{}/total", rrd_prefix);
1145 rrd_update_gauge(&rrd_key, status.total as f64);
1146 let rrd_key = format!("{}/used", rrd_prefix);
1147 rrd_update_gauge(&rrd_key, status.used as f64);
f362f8f0
DT
1148 let rrd_key = format!("{}/available", rrd_prefix);
1149 rrd_update_gauge(&rrd_key, status.available as f64);
759c4c87
DC
1150 }
1151
1152 if let Some(stat) = &disk.dev {
1153 let rrd_key = format!("{}/read_ios", rrd_prefix);
1154 rrd_update_derive(&rrd_key, stat.read_ios as f64);
1155 let rrd_key = format!("{}/read_bytes", rrd_prefix);
1156 rrd_update_derive(&rrd_key, (stat.read_sectors * 512) as f64);
1157
1158 let rrd_key = format!("{}/write_ios", rrd_prefix);
1159 rrd_update_derive(&rrd_key, stat.write_ios as f64);
1160 let rrd_key = format!("{}/write_bytes", rrd_prefix);
1161 rrd_update_derive(&rrd_key, (stat.write_sectors * 512) as f64);
1162
1163 let rrd_key = format!("{}/io_ticks", rrd_prefix);
1164 rrd_update_derive(&rrd_key, (stat.io_ticks as f64) / 1000.0);
1165 }
eaeda365 1166}
dd15c0aa 1167
b15751bf 1168fn check_schedule(worker_type: &str, event_str: &str, id: &str) -> bool {
68b6c120 1169 let event: CalendarEvent = match event_str.parse() {
82c05b41
HL
1170 Ok(event) => event,
1171 Err(err) => {
84df915e 1172 eprintln!("unable to parse schedule '{event_str}' - {err}");
82c05b41
HL
1173 return false;
1174 }
1175 };
1176
9a37bd6c 1177 let last = match jobstate::last_run_time(worker_type, id) {
82c05b41
HL
1178 Ok(time) => time,
1179 Err(err) => {
84df915e 1180 eprintln!("could not get last run time of {worker_type} {id}: {err}");
82c05b41
HL
1181 return false;
1182 }
1183 };
1184
7549114c 1185 let next = match event.compute_next_event(last) {
82c05b41
HL
1186 Ok(Some(next)) => next,
1187 Ok(None) => return false,
1188 Err(err) => {
84df915e 1189 eprintln!("compute_next_event for '{event_str}' failed - {err}");
82c05b41
HL
1190 return false;
1191 }
1192 };
1193
6ef1b649 1194 let now = proxmox_time::epoch_i64();
82c05b41
HL
1195 next <= now
1196}
1197
759c4c87
DC
1198fn gather_disk_stats(disk_manager: Arc<DiskManage>, path: &Path, name: &str) -> DiskStat {
1199 let usage = match proxmox_sys::fs::fs_info(path) {
1200 Ok(status) => Some(status),
91e5bb49 1201 Err(err) => {
84df915e 1202 eprintln!("read fs info on {path:?} failed - {err}");
759c4c87 1203 None
91e5bb49 1204 }
759c4c87 1205 };
91e5bb49 1206
759c4c87
DC
1207 let dev = match disk_manager.find_mounted_device(path) {
1208 Ok(None) => None,
934f5bb8
DM
1209 Ok(Some((fs_type, device, source))) => {
1210 let mut device_stat = None;
7c069e82
DC
1211 match (fs_type.as_str(), source) {
1212 ("zfs", Some(source)) => match source.into_string() {
1213 Ok(dataset) => match zfs_dataset_stats(&dataset) {
1214 Ok(stat) => device_stat = Some(stat),
84df915e 1215 Err(err) => eprintln!("zfs_dataset_stats({dataset:?}) failed - {err}"),
7c069e82
DC
1216 },
1217 Err(source) => {
84df915e 1218 eprintln!("zfs_pool_stats({source:?}) failed - invalid characters")
91e5bb49 1219 }
7c069e82 1220 },
934f5bb8
DM
1221 _ => {
1222 if let Ok(disk) = disk_manager.clone().disk_by_dev_num(device.into_dev_t()) {
1223 match disk.read_stat() {
1224 Ok(stat) => device_stat = stat,
84df915e 1225 Err(err) => eprintln!("disk.read_stat {path:?} failed - {err}"),
91e5bb49
DM
1226 }
1227 }
1228 }
91e5bb49 1229 }
759c4c87 1230 device_stat
8c03041a 1231 }
934f5bb8 1232 Err(err) => {
84df915e 1233 eprintln!("find_mounted_device failed - {err}");
759c4c87 1234 None
934f5bb8 1235 }
759c4c87
DC
1236 };
1237
1238 DiskStat {
1239 name: name.to_string(),
1240 usage,
1241 dev,
8c03041a 1242 }
8c03041a 1243}
e511e0e5
DM
1244
1245// Rate Limiter lookup
a0172d76 1246async fn run_traffic_control_updater() {
5d74f796
TL
1247 loop {
1248 let delay_target = Instant::now() + Duration::from_secs(1);
a0172d76
DM
1249
1250 {
1251 let mut cache = TRAFFIC_CONTROL_CACHE.lock().unwrap();
1252 cache.compute_current_rates();
1253 }
1254
5d74f796
TL
1255 tokio::time::sleep_until(tokio::time::Instant::from_std(delay_target)).await;
1256 }
e511e0e5
DM
1257}
1258
1259fn lookup_rate_limiter(
1993d986 1260 peer: std::net::SocketAddr,
8e70d421 1261) -> (Option<SharedRateLimit>, Option<SharedRateLimit>) {
e511e0e5
DM
1262 let mut cache = TRAFFIC_CONTROL_CACHE.lock().unwrap();
1263
1264 let now = proxmox_time::epoch_i64();
1265
1266 cache.reload(now);
1267
1268 let (_rule_name, read_limiter, write_limiter) = cache.lookup_rate_limiter(peer, now);
1269
1270 (read_limiter, write_limiter)
1271}