]> git.proxmox.com Git - proxmox-backup.git/blame - pbs-datastore/src/task_tracking.rs
pbs-datastore: clippy fixes
[proxmox-backup.git] / pbs-datastore / src / task_tracking.rs
CommitLineData
4bc84a65
HL
1use anyhow::Error;
2use libc::pid_t;
3use nix::unistd::Pid;
5fd823c3 4use std::iter::Sum;
4bc84a65
HL
5use std::path::PathBuf;
6
7use pbs_api_types::Operation;
8use proxmox_sys::fs::{file_read_optional_string, open_file_locked, replace_file, CreateOptions};
9use proxmox_sys::linux::procfs;
10use serde::{Deserialize, Serialize};
11
5fd823c3
HL
12#[derive(Deserialize, Serialize, Clone, Copy, Default)]
13pub struct ActiveOperationStats {
14 pub read: i64,
15 pub write: i64,
16}
17
18impl Sum<Self> for ActiveOperationStats {
19 fn sum<I>(iter: I) -> Self
20 where
21 I: Iterator<Item = Self>,
22 {
23 iter.fold(Self::default(), |a, b| Self {
24 read: a.read + b.read,
25 write: a.write + b.write,
26 })
27 }
28}
29
4bc84a65
HL
30#[derive(Deserialize, Serialize, Clone)]
31struct TaskOperations {
32 pid: u32,
33 starttime: u64,
5fd823c3
HL
34 active_operations: ActiveOperationStats,
35}
36
857f346c
WB
37fn open_lock_file(name: &str) -> Result<(std::fs::File, CreateOptions), Error> {
38 let user = pbs_config::backup_user()?;
39
40 let lock_path = PathBuf::from(format!("{}/{}.lock", crate::ACTIVE_OPERATIONS_DIR, name));
41
42 let options = CreateOptions::new()
43 .group(user.gid)
44 .owner(user.uid)
45 .perm(nix::sys::stat::Mode::from_bits_truncate(0o660));
46
47 let timeout = std::time::Duration::new(10, 0);
48
49 Ok((
a57413a5 50 open_file_locked(lock_path, timeout, true, options.clone())?,
857f346c
WB
51 options,
52 ))
53}
54
55/// MUST return `Some(file)` when `lock` is `true`.
56fn get_active_operations_do(
57 name: &str,
58 lock: bool,
59) -> Result<(ActiveOperationStats, Option<std::fs::File>), Error> {
5fd823c3 60 let path = PathBuf::from(format!("{}/{}", crate::ACTIVE_OPERATIONS_DIR, name));
857f346c
WB
61 let lock = if lock {
62 Some(open_lock_file(name)?.0)
63 } else {
64 None
65 };
5fd823c3 66
a57413a5 67 let data = match file_read_optional_string(path)? {
5fd823c3
HL
68 Some(data) => serde_json::from_str::<Vec<TaskOperations>>(&data)?
69 .iter()
70 .filter_map(
71 |task| match procfs::check_process_running(task.pid as pid_t) {
72 Some(stat) if task.starttime == stat.starttime => Some(task.active_operations),
73 _ => None,
74 },
75 )
76 .sum(),
77 None => ActiveOperationStats::default(),
857f346c
WB
78 };
79
80 Ok((data, lock))
81}
82
83pub fn get_active_operations(name: &str) -> Result<ActiveOperationStats, Error> {
84 Ok(get_active_operations_do(name, false)?.0)
85}
86
87pub fn get_active_operations_locked(
88 name: &str,
89) -> Result<(ActiveOperationStats, std::fs::File), Error> {
90 let (data, lock) = get_active_operations_do(name, true)?;
91 Ok((data, lock.unwrap()))
4bc84a65
HL
92}
93
94pub fn update_active_operations(name: &str, operation: Operation, count: i64) -> Result<(), Error> {
95 let path = PathBuf::from(format!("{}/{}", crate::ACTIVE_OPERATIONS_DIR, name));
4bc84a65 96
857f346c 97 let (_lock, options) = open_lock_file(name)?;
4bc84a65
HL
98
99 let pid = std::process::id();
100 let starttime = procfs::PidStat::read_from_pid(Pid::from_raw(pid as pid_t))?.starttime;
101 let mut updated = false;
102
103 let mut updated_tasks: Vec<TaskOperations> = match file_read_optional_string(&path)? {
104 Some(data) => serde_json::from_str::<Vec<TaskOperations>>(&data)?
105 .iter_mut()
106 .filter_map(
107 |task| match procfs::check_process_running(task.pid as pid_t) {
108 Some(stat) if pid == task.pid && stat.starttime != task.starttime => None,
109 Some(_) => {
110 if pid == task.pid {
111 updated = true;
112 match operation {
5fd823c3
HL
113 Operation::Read => task.active_operations.read += count,
114 Operation::Write => task.active_operations.write += count,
0408f60b 115 Operation::Lookup => (), // no IO must happen there
4bc84a65
HL
116 };
117 }
118 Some(task.clone())
119 }
120 _ => None,
121 },
122 )
123 .collect(),
124 None => Vec::new(),
125 };
126
127 if !updated {
5fd823c3
HL
128 updated_tasks.push(TaskOperations {
129 pid,
130 starttime,
131 active_operations: match operation {
132 Operation::Read => ActiveOperationStats { read: 1, write: 0 },
133 Operation::Write => ActiveOperationStats { read: 0, write: 1 },
0408f60b 134 Operation::Lookup => ActiveOperationStats { read: 0, write: 0 },
4bc84a65
HL
135 },
136 })
137 }
138 replace_file(
139 &path,
140 serde_json::to_string(&updated_tasks)?.as_bytes(),
141 options,
142 false,
143 )
144}