]> git.proxmox.com Git - proxmox-backup.git/blob - src/bin/proxmox-backup-proxy.rs
proxy: fix warnings
[proxmox-backup.git] / src / bin / proxmox-backup-proxy.rs
1 use std::sync::{Arc};
2 use std::path::{Path, PathBuf};
3 use std::os::unix::io::AsRawFd;
4
5 use anyhow::{bail, format_err, Error};
6 use futures::*;
7 use hyper;
8 use openssl::ssl::{SslMethod, SslAcceptor, SslFiletype};
9
10 use proxmox::try_block;
11 use proxmox::api::RpcEnvironmentType;
12
13 use proxmox_backup::{
14 backup::DataStore,
15 server::{
16 UPID,
17 WorkerTask,
18 ApiConfig,
19 rest::*,
20 jobstate::{
21 self,
22 Job,
23 },
24 rotate_task_log_archive,
25 },
26 tools::systemd::time::{
27 parse_calendar_event,
28 compute_next_event,
29 },
30 };
31
32
33 use proxmox_backup::api2::types::Authid;
34 use proxmox_backup::configdir;
35 use proxmox_backup::buildcfg;
36 use proxmox_backup::server;
37 use proxmox_backup::auth_helpers::*;
38 use proxmox_backup::tools::{
39 daemon,
40 disks::{
41 DiskManage,
42 zfs_pool_stats,
43 },
44 socket::{
45 set_tcp_keepalive,
46 PROXMOX_BACKUP_TCP_KEEPALIVE_TIME,
47 },
48 };
49
50 use proxmox_backup::api2::pull::do_sync_job;
51 use proxmox_backup::server::do_verification_job;
52 use proxmox_backup::server::do_prune_job;
53
54 fn main() -> Result<(), Error> {
55 proxmox_backup::tools::setup_safe_path_env();
56
57 let backup_uid = proxmox_backup::backup::backup_user()?.uid;
58 let backup_gid = proxmox_backup::backup::backup_group()?.gid;
59 let running_uid = nix::unistd::Uid::effective();
60 let running_gid = nix::unistd::Gid::effective();
61
62 if running_uid != backup_uid || running_gid != backup_gid {
63 bail!("proxy not running as backup user or group (got uid {} gid {})", running_uid, running_gid);
64 }
65
66 proxmox_backup::tools::runtime::main(run())
67 }
68
69 async fn run() -> Result<(), Error> {
70 if let Err(err) = syslog::init(
71 syslog::Facility::LOG_DAEMON,
72 log::LevelFilter::Info,
73 Some("proxmox-backup-proxy")) {
74 bail!("unable to inititialize syslog - {}", err);
75 }
76
77 let _ = public_auth_key(); // load with lazy_static
78 let _ = csrf_secret(); // load with lazy_static
79
80 let mut config = ApiConfig::new(
81 buildcfg::JS_DIR, &proxmox_backup::api2::ROUTER, RpcEnvironmentType::PUBLIC)?;
82
83 config.add_alias("novnc", "/usr/share/novnc-pve");
84 config.add_alias("extjs", "/usr/share/javascript/extjs");
85 config.add_alias("fontawesome", "/usr/share/fonts-font-awesome");
86 config.add_alias("xtermjs", "/usr/share/pve-xtermjs");
87 config.add_alias("locale", "/usr/share/pbs-i18n");
88 config.add_alias("widgettoolkit", "/usr/share/javascript/proxmox-widget-toolkit");
89 config.add_alias("css", "/usr/share/javascript/proxmox-backup/css");
90 config.add_alias("docs", "/usr/share/doc/proxmox-backup/html");
91
92 let mut indexpath = PathBuf::from(buildcfg::JS_DIR);
93 indexpath.push("index.hbs");
94 config.register_template("index", &indexpath)?;
95 config.register_template("console", "/usr/share/pve-xtermjs/index.html.hbs")?;
96
97 config.enable_file_log(buildcfg::API_ACCESS_LOG_FN)?;
98
99 let rest_server = RestServer::new(config);
100
101 //openssl req -x509 -newkey rsa:4096 -keyout /etc/proxmox-backup/proxy.key -out /etc/proxmox-backup/proxy.pem -nodes
102 let key_path = configdir!("/proxy.key");
103 let cert_path = configdir!("/proxy.pem");
104
105 let mut acceptor = SslAcceptor::mozilla_intermediate_v5(SslMethod::tls()).unwrap();
106 acceptor.set_private_key_file(key_path, SslFiletype::PEM)
107 .map_err(|err| format_err!("unable to read proxy key {} - {}", key_path, err))?;
108 acceptor.set_certificate_chain_file(cert_path)
109 .map_err(|err| format_err!("unable to read proxy cert {} - {}", cert_path, err))?;
110 acceptor.check_private_key().unwrap();
111
112 let acceptor = Arc::new(acceptor.build());
113
114 let server = daemon::create_daemon(
115 ([0,0,0,0,0,0,0,0], 8007).into(),
116 |listener, ready| {
117 let connections = proxmox_backup::tools::async_io::StaticIncoming::from(listener)
118 .map_err(Error::from)
119 .try_filter_map(move |(sock, _addr)| {
120 let acceptor = Arc::clone(&acceptor);
121 async move {
122 sock.set_nodelay(true).unwrap();
123
124 let _ = set_tcp_keepalive(sock.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME);
125
126 Ok(tokio_openssl::accept(&acceptor, sock)
127 .await
128 .ok() // handshake errors aren't be fatal, so return None to filter
129 )
130 }
131 });
132 let connections = proxmox_backup::tools::async_io::HyperAccept(connections);
133
134 Ok(ready
135 .and_then(|_| hyper::Server::builder(connections)
136 .serve(rest_server)
137 .with_graceful_shutdown(server::shutdown_future())
138 .map_err(Error::from)
139 )
140 .map_err(|err| eprintln!("server error: {}", err))
141 .map(|_| ())
142 )
143 },
144 );
145
146 daemon::systemd_notify(daemon::SystemdNotify::Ready)?;
147
148 let init_result: Result<(), Error> = try_block!({
149 server::create_task_control_socket()?;
150 server::server_state_init()?;
151 Ok(())
152 });
153
154 if let Err(err) = init_result {
155 bail!("unable to start daemon - {}", err);
156 }
157
158 start_task_scheduler();
159 start_stat_generator();
160
161 server.await?;
162 log::info!("server shutting down, waiting for active workers to complete");
163 proxmox_backup::server::last_worker_future().await?;
164 log::info!("done - exit server");
165
166 Ok(())
167 }
168
169 fn start_stat_generator() {
170 let abort_future = server::shutdown_future();
171 let future = Box::pin(run_stat_generator());
172 let task = futures::future::select(future, abort_future);
173 tokio::spawn(task.map(|_| ()));
174 }
175
176 fn start_task_scheduler() {
177 let abort_future = server::shutdown_future();
178 let future = Box::pin(run_task_scheduler());
179 let task = futures::future::select(future, abort_future);
180 tokio::spawn(task.map(|_| ()));
181 }
182
183 use std::time::{SystemTime, Instant, Duration, UNIX_EPOCH};
184
185 fn next_minute() -> Result<Instant, Error> {
186 let now = SystemTime::now();
187 let epoch_now = now.duration_since(UNIX_EPOCH)?;
188 let epoch_next = Duration::from_secs((epoch_now.as_secs()/60 + 1)*60);
189 Ok(Instant::now() + epoch_next - epoch_now)
190 }
191
192 async fn run_task_scheduler() {
193
194 let mut count: usize = 0;
195
196 loop {
197 count += 1;
198
199 let delay_target = match next_minute() { // try to run very minute
200 Ok(d) => d,
201 Err(err) => {
202 eprintln!("task scheduler: compute next minute failed - {}", err);
203 tokio::time::delay_until(tokio::time::Instant::from_std(Instant::now() + Duration::from_secs(60))).await;
204 continue;
205 }
206 };
207
208 if count > 2 { // wait 1..2 minutes before starting
209 match schedule_tasks().catch_unwind().await {
210 Err(panic) => {
211 match panic.downcast::<&str>() {
212 Ok(msg) => {
213 eprintln!("task scheduler panic: {}", msg);
214 }
215 Err(_) => {
216 eprintln!("task scheduler panic - unknown type");
217 }
218 }
219 }
220 Ok(Err(err)) => {
221 eprintln!("task scheduler failed - {:?}", err);
222 }
223 Ok(Ok(_)) => {}
224 }
225 }
226
227 tokio::time::delay_until(tokio::time::Instant::from_std(delay_target)).await;
228 }
229 }
230
231 async fn schedule_tasks() -> Result<(), Error> {
232
233 schedule_datastore_garbage_collection().await;
234 schedule_datastore_prune().await;
235 schedule_datastore_sync_jobs().await;
236 schedule_datastore_verify_jobs().await;
237 schedule_task_log_rotate().await;
238
239 Ok(())
240 }
241
242 async fn schedule_datastore_garbage_collection() {
243
244 use proxmox_backup::config::{
245 datastore::{
246 self,
247 DataStoreConfig,
248 },
249 };
250
251 let config = match datastore::config() {
252 Err(err) => {
253 eprintln!("unable to read datastore config - {}", err);
254 return;
255 }
256 Ok((config, _digest)) => config,
257 };
258
259 for (store, (_, store_config)) in config.sections {
260 let datastore = match DataStore::lookup_datastore(&store) {
261 Ok(datastore) => datastore,
262 Err(err) => {
263 eprintln!("lookup_datastore failed - {}", err);
264 continue;
265 }
266 };
267
268 let store_config: DataStoreConfig = match serde_json::from_value(store_config) {
269 Ok(c) => c,
270 Err(err) => {
271 eprintln!("datastore config from_value failed - {}", err);
272 continue;
273 }
274 };
275
276 let event_str = match store_config.gc_schedule {
277 Some(event_str) => event_str,
278 None => continue,
279 };
280
281 let event = match parse_calendar_event(&event_str) {
282 Ok(event) => event,
283 Err(err) => {
284 eprintln!("unable to parse schedule '{}' - {}", event_str, err);
285 continue;
286 }
287 };
288
289 if datastore.garbage_collection_running() { continue; }
290
291 let worker_type = "garbage_collection";
292
293 let stat = datastore.last_gc_status();
294 let last = if let Some(upid_str) = stat.upid {
295 match upid_str.parse::<UPID>() {
296 Ok(upid) => upid.starttime,
297 Err(err) => {
298 eprintln!("unable to parse upid '{}' - {}", upid_str, err);
299 continue;
300 }
301 }
302 } else {
303 match jobstate::last_run_time(worker_type, &store) {
304 Ok(time) => time,
305 Err(err) => {
306 eprintln!("could not get last run time of {} {}: {}", worker_type, store, err);
307 continue;
308 }
309 }
310 };
311
312 let next = match compute_next_event(&event, last, false) {
313 Ok(Some(next)) => next,
314 Ok(None) => continue,
315 Err(err) => {
316 eprintln!("compute_next_event for '{}' failed - {}", event_str, err);
317 continue;
318 }
319 };
320
321 let now = proxmox::tools::time::epoch_i64();
322
323 if next > now { continue; }
324
325 let job = match Job::new(worker_type, &store) {
326 Ok(job) => job,
327 Err(_) => continue, // could not get lock
328 };
329
330 let auth_id = Authid::backup_auth_id();
331
332 if let Err(err) = crate::server::do_garbage_collection_job(job, datastore, auth_id, Some(event_str)) {
333 eprintln!("unable to start garbage collection job on datastore {} - {}", store, err);
334 }
335 }
336 }
337
338 async fn schedule_datastore_prune() {
339
340 use proxmox_backup::{
341 backup::{
342 PruneOptions,
343 },
344 config::datastore::{
345 self,
346 DataStoreConfig,
347 },
348 };
349
350 let config = match datastore::config() {
351 Err(err) => {
352 eprintln!("unable to read datastore config - {}", err);
353 return;
354 }
355 Ok((config, _digest)) => config,
356 };
357
358 for (store, (_, store_config)) in config.sections {
359
360 let store_config: DataStoreConfig = match serde_json::from_value(store_config) {
361 Ok(c) => c,
362 Err(err) => {
363 eprintln!("datastore '{}' config from_value failed - {}", store, err);
364 continue;
365 }
366 };
367
368 let event_str = match store_config.prune_schedule {
369 Some(event_str) => event_str,
370 None => continue,
371 };
372
373 let prune_options = PruneOptions {
374 keep_last: store_config.keep_last,
375 keep_hourly: store_config.keep_hourly,
376 keep_daily: store_config.keep_daily,
377 keep_weekly: store_config.keep_weekly,
378 keep_monthly: store_config.keep_monthly,
379 keep_yearly: store_config.keep_yearly,
380 };
381
382 if !prune_options.keeps_something() { // no prune settings - keep all
383 continue;
384 }
385
386 let worker_type = "prune";
387 if check_schedule(worker_type, &event_str, &store) {
388 let job = match Job::new(worker_type, &store) {
389 Ok(job) => job,
390 Err(_) => continue, // could not get lock
391 };
392
393 let auth_id = Authid::backup_auth_id().clone();
394 if let Err(err) = do_prune_job(job, prune_options, store.clone(), &auth_id, Some(event_str)) {
395 eprintln!("unable to start datastore prune job {} - {}", &store, err);
396 }
397 };
398 }
399 }
400
401 async fn schedule_datastore_sync_jobs() {
402
403 use proxmox_backup::config::sync::{
404 self,
405 SyncJobConfig,
406 };
407
408 let config = match sync::config() {
409 Err(err) => {
410 eprintln!("unable to read sync job config - {}", err);
411 return;
412 }
413 Ok((config, _digest)) => config,
414 };
415
416 for (job_id, (_, job_config)) in config.sections {
417 let job_config: SyncJobConfig = match serde_json::from_value(job_config) {
418 Ok(c) => c,
419 Err(err) => {
420 eprintln!("sync job config from_value failed - {}", err);
421 continue;
422 }
423 };
424
425 let event_str = match job_config.schedule {
426 Some(ref event_str) => event_str.clone(),
427 None => continue,
428 };
429
430 let worker_type = "syncjob";
431 if check_schedule(worker_type, &event_str, &job_id) {
432 let job = match Job::new(worker_type, &job_id) {
433 Ok(job) => job,
434 Err(_) => continue, // could not get lock
435 };
436
437 let auth_id = Authid::backup_auth_id().clone();
438 if let Err(err) = do_sync_job(job, job_config, &auth_id, Some(event_str)) {
439 eprintln!("unable to start datastore sync job {} - {}", &job_id, err);
440 }
441 };
442 }
443 }
444
445 async fn schedule_datastore_verify_jobs() {
446
447 use proxmox_backup::config::verify::{
448 self,
449 VerificationJobConfig,
450 };
451
452 let config = match verify::config() {
453 Err(err) => {
454 eprintln!("unable to read verification job config - {}", err);
455 return;
456 }
457 Ok((config, _digest)) => config,
458 };
459 for (job_id, (_, job_config)) in config.sections {
460 let job_config: VerificationJobConfig = match serde_json::from_value(job_config) {
461 Ok(c) => c,
462 Err(err) => {
463 eprintln!("verification job config from_value failed - {}", err);
464 continue;
465 }
466 };
467 let event_str = match job_config.schedule {
468 Some(ref event_str) => event_str.clone(),
469 None => continue,
470 };
471
472 let worker_type = "verificationjob";
473 let auth_id = Authid::backup_auth_id().clone();
474 if check_schedule(worker_type, &event_str, &job_id) {
475 let job = match Job::new(&worker_type, &job_id) {
476 Ok(job) => job,
477 Err(_) => continue, // could not get lock
478 };
479 if let Err(err) = do_verification_job(job, job_config, &auth_id, Some(event_str)) {
480 eprintln!("unable to start datastore verification job {} - {}", &job_id, err);
481 }
482 };
483 }
484 }
485
486 async fn schedule_task_log_rotate() {
487
488 let worker_type = "logrotate";
489 let job_id = "task_archive";
490
491 // schedule daily at 00:00 like normal logrotate
492 let schedule = "00:00";
493
494 if !check_schedule(worker_type, schedule, job_id) {
495 // if we never ran the rotation, schedule instantly
496 match jobstate::JobState::load(worker_type, job_id) {
497 Ok(state) => match state {
498 jobstate::JobState::Created { .. } => {},
499 _ => return,
500 },
501 _ => return,
502 }
503 }
504
505 let mut job = match Job::new(worker_type, job_id) {
506 Ok(job) => job,
507 Err(_) => return, // could not get lock
508 };
509
510 if let Err(err) = WorkerTask::new_thread(
511 worker_type,
512 Some(job_id.to_string()),
513 Authid::backup_auth_id().clone(),
514 false,
515 move |worker| {
516 job.start(&worker.upid().to_string())?;
517 worker.log(format!("starting task log rotation"));
518
519 let result = try_block!({
520 // rotate task log archive
521 let max_size = 500000; // a normal entry has about 100b, so ~ 5000 entries/file
522 let max_files = 20; // times twenty files gives at least 100000 task entries
523 let has_rotated = rotate_task_log_archive(max_size, true, Some(max_files))?;
524 if has_rotated {
525 worker.log(format!("task log archive was rotated"));
526 } else {
527 worker.log(format!("task log archive was not rotated"));
528 }
529
530 Ok(())
531 });
532
533 let status = worker.create_state(&result);
534
535 if let Err(err) = job.finish(status) {
536 eprintln!("could not finish job state for {}: {}", worker_type, err);
537 }
538
539 result
540 },
541 ) {
542 eprintln!("unable to start task log rotation: {}", err);
543 }
544
545 }
546
547 async fn run_stat_generator() {
548
549 let mut count = 0;
550 loop {
551 count += 1;
552 let save = if count >= 6 { count = 0; true } else { false };
553
554 let delay_target = Instant::now() + Duration::from_secs(10);
555
556 generate_host_stats(save).await;
557
558 tokio::time::delay_until(tokio::time::Instant::from_std(delay_target)).await;
559
560 }
561
562 }
563
564 fn rrd_update_gauge(name: &str, value: f64, save: bool) {
565 use proxmox_backup::rrd;
566 if let Err(err) = rrd::update_value(name, value, rrd::DST::Gauge, save) {
567 eprintln!("rrd::update_value '{}' failed - {}", name, err);
568 }
569 }
570
571 fn rrd_update_derive(name: &str, value: f64, save: bool) {
572 use proxmox_backup::rrd;
573 if let Err(err) = rrd::update_value(name, value, rrd::DST::Derive, save) {
574 eprintln!("rrd::update_value '{}' failed - {}", name, err);
575 }
576 }
577
578 async fn generate_host_stats(save: bool) {
579 use proxmox::sys::linux::procfs::{
580 read_meminfo, read_proc_stat, read_proc_net_dev, read_loadavg};
581 use proxmox_backup::config::datastore;
582
583
584 proxmox_backup::tools::runtime::block_in_place(move || {
585
586 match read_proc_stat() {
587 Ok(stat) => {
588 rrd_update_gauge("host/cpu", stat.cpu, save);
589 rrd_update_gauge("host/iowait", stat.iowait_percent, save);
590 }
591 Err(err) => {
592 eprintln!("read_proc_stat failed - {}", err);
593 }
594 }
595
596 match read_meminfo() {
597 Ok(meminfo) => {
598 rrd_update_gauge("host/memtotal", meminfo.memtotal as f64, save);
599 rrd_update_gauge("host/memused", meminfo.memused as f64, save);
600 rrd_update_gauge("host/swaptotal", meminfo.swaptotal as f64, save);
601 rrd_update_gauge("host/swapused", meminfo.swapused as f64, save);
602 }
603 Err(err) => {
604 eprintln!("read_meminfo failed - {}", err);
605 }
606 }
607
608 match read_proc_net_dev() {
609 Ok(netdev) => {
610 use proxmox_backup::config::network::is_physical_nic;
611 let mut netin = 0;
612 let mut netout = 0;
613 for item in netdev {
614 if !is_physical_nic(&item.device) { continue; }
615 netin += item.receive;
616 netout += item.send;
617 }
618 rrd_update_derive("host/netin", netin as f64, save);
619 rrd_update_derive("host/netout", netout as f64, save);
620 }
621 Err(err) => {
622 eprintln!("read_prox_net_dev failed - {}", err);
623 }
624 }
625
626 match read_loadavg() {
627 Ok(loadavg) => {
628 rrd_update_gauge("host/loadavg", loadavg.0 as f64, save);
629 }
630 Err(err) => {
631 eprintln!("read_loadavg failed - {}", err);
632 }
633 }
634
635 let disk_manager = DiskManage::new();
636
637 gather_disk_stats(disk_manager.clone(), Path::new("/"), "host", save);
638
639 match datastore::config() {
640 Ok((config, _)) => {
641 let datastore_list: Vec<datastore::DataStoreConfig> =
642 config.convert_to_typed_array("datastore").unwrap_or(Vec::new());
643
644 for config in datastore_list {
645
646 let rrd_prefix = format!("datastore/{}", config.name);
647 let path = std::path::Path::new(&config.path);
648 gather_disk_stats(disk_manager.clone(), path, &rrd_prefix, save);
649 }
650 }
651 Err(err) => {
652 eprintln!("read datastore config failed - {}", err);
653 }
654 }
655
656 });
657 }
658
659 fn check_schedule(worker_type: &str, event_str: &str, id: &str) -> bool {
660 let event = match parse_calendar_event(event_str) {
661 Ok(event) => event,
662 Err(err) => {
663 eprintln!("unable to parse schedule '{}' - {}", event_str, err);
664 return false;
665 }
666 };
667
668 let last = match jobstate::last_run_time(worker_type, &id) {
669 Ok(time) => time,
670 Err(err) => {
671 eprintln!("could not get last run time of {} {}: {}", worker_type, id, err);
672 return false;
673 }
674 };
675
676 let next = match compute_next_event(&event, last, false) {
677 Ok(Some(next)) => next,
678 Ok(None) => return false,
679 Err(err) => {
680 eprintln!("compute_next_event for '{}' failed - {}", event_str, err);
681 return false;
682 }
683 };
684
685 let now = proxmox::tools::time::epoch_i64();
686 next <= now
687 }
688
689 fn gather_disk_stats(disk_manager: Arc<DiskManage>, path: &Path, rrd_prefix: &str, save: bool) {
690
691 match proxmox_backup::tools::disks::disk_usage(path) {
692 Ok(status) => {
693 let rrd_key = format!("{}/total", rrd_prefix);
694 rrd_update_gauge(&rrd_key, status.total as f64, save);
695 let rrd_key = format!("{}/used", rrd_prefix);
696 rrd_update_gauge(&rrd_key, status.used as f64, save);
697 }
698 Err(err) => {
699 eprintln!("read disk_usage on {:?} failed - {}", path, err);
700 }
701 }
702
703 match disk_manager.find_mounted_device(path) {
704 Ok(None) => {},
705 Ok(Some((fs_type, device, source))) => {
706 let mut device_stat = None;
707 match fs_type.as_str() {
708 "zfs" => {
709 if let Some(pool) = source {
710 match zfs_pool_stats(&pool) {
711 Ok(stat) => device_stat = stat,
712 Err(err) => eprintln!("zfs_pool_stats({:?}) failed - {}", pool, err),
713 }
714 }
715 }
716 _ => {
717 if let Ok(disk) = disk_manager.clone().disk_by_dev_num(device.into_dev_t()) {
718 match disk.read_stat() {
719 Ok(stat) => device_stat = stat,
720 Err(err) => eprintln!("disk.read_stat {:?} failed - {}", path, err),
721 }
722 }
723 }
724 }
725 if let Some(stat) = device_stat {
726 let rrd_key = format!("{}/read_ios", rrd_prefix);
727 rrd_update_derive(&rrd_key, stat.read_ios as f64, save);
728 let rrd_key = format!("{}/read_bytes", rrd_prefix);
729 rrd_update_derive(&rrd_key, (stat.read_sectors*512) as f64, save);
730
731 let rrd_key = format!("{}/write_ios", rrd_prefix);
732 rrd_update_derive(&rrd_key, stat.write_ios as f64, save);
733 let rrd_key = format!("{}/write_bytes", rrd_prefix);
734 rrd_update_derive(&rrd_key, (stat.write_sectors*512) as f64, save);
735
736 let rrd_key = format!("{}/io_ticks", rrd_prefix);
737 rrd_update_derive(&rrd_key, (stat.io_ticks as f64)/1000.0, save);
738 }
739 }
740 Err(err) => {
741 eprintln!("find_mounted_device failed - {}", err);
742 }
743 }
744 }