]>
Commit | Line | Data |
---|---|---|
4bc84a65 HL |
1 | use anyhow::Error; |
2 | use libc::pid_t; | |
3 | use nix::unistd::Pid; | |
5fd823c3 | 4 | use std::iter::Sum; |
4bc84a65 HL |
5 | use std::path::PathBuf; |
6 | ||
7 | use pbs_api_types::Operation; | |
8 | use proxmox_sys::fs::{file_read_optional_string, open_file_locked, replace_file, CreateOptions}; | |
9 | use proxmox_sys::linux::procfs; | |
10 | use serde::{Deserialize, Serialize}; | |
11 | ||
5fd823c3 HL |
12 | #[derive(Deserialize, Serialize, Clone, Copy, Default)] |
13 | pub struct ActiveOperationStats { | |
14 | pub read: i64, | |
15 | pub write: i64, | |
16 | } | |
17 | ||
18 | impl Sum<Self> for ActiveOperationStats { | |
19 | fn sum<I>(iter: I) -> Self | |
20 | where | |
21 | I: Iterator<Item = Self>, | |
22 | { | |
23 | iter.fold(Self::default(), |a, b| Self { | |
24 | read: a.read + b.read, | |
25 | write: a.write + b.write, | |
26 | }) | |
27 | } | |
28 | } | |
29 | ||
4bc84a65 HL |
30 | #[derive(Deserialize, Serialize, Clone)] |
31 | struct TaskOperations { | |
32 | pid: u32, | |
33 | starttime: u64, | |
5fd823c3 HL |
34 | active_operations: ActiveOperationStats, |
35 | } | |
36 | ||
857f346c WB |
37 | fn open_lock_file(name: &str) -> Result<(std::fs::File, CreateOptions), Error> { |
38 | let user = pbs_config::backup_user()?; | |
39 | ||
40 | let lock_path = PathBuf::from(format!("{}/{}.lock", crate::ACTIVE_OPERATIONS_DIR, name)); | |
41 | ||
42 | let options = CreateOptions::new() | |
43 | .group(user.gid) | |
44 | .owner(user.uid) | |
45 | .perm(nix::sys::stat::Mode::from_bits_truncate(0o660)); | |
46 | ||
47 | let timeout = std::time::Duration::new(10, 0); | |
48 | ||
49 | Ok(( | |
a57413a5 | 50 | open_file_locked(lock_path, timeout, true, options.clone())?, |
857f346c WB |
51 | options, |
52 | )) | |
53 | } | |
54 | ||
55 | /// MUST return `Some(file)` when `lock` is `true`. | |
56 | fn get_active_operations_do( | |
57 | name: &str, | |
58 | lock: bool, | |
59 | ) -> Result<(ActiveOperationStats, Option<std::fs::File>), Error> { | |
5fd823c3 | 60 | let path = PathBuf::from(format!("{}/{}", crate::ACTIVE_OPERATIONS_DIR, name)); |
857f346c WB |
61 | let lock = if lock { |
62 | Some(open_lock_file(name)?.0) | |
63 | } else { | |
64 | None | |
65 | }; | |
5fd823c3 | 66 | |
a57413a5 | 67 | let data = match file_read_optional_string(path)? { |
5fd823c3 HL |
68 | Some(data) => serde_json::from_str::<Vec<TaskOperations>>(&data)? |
69 | .iter() | |
70 | .filter_map( | |
71 | |task| match procfs::check_process_running(task.pid as pid_t) { | |
72 | Some(stat) if task.starttime == stat.starttime => Some(task.active_operations), | |
73 | _ => None, | |
74 | }, | |
75 | ) | |
76 | .sum(), | |
77 | None => ActiveOperationStats::default(), | |
857f346c WB |
78 | }; |
79 | ||
80 | Ok((data, lock)) | |
81 | } | |
82 | ||
83 | pub fn get_active_operations(name: &str) -> Result<ActiveOperationStats, Error> { | |
84 | Ok(get_active_operations_do(name, false)?.0) | |
85 | } | |
86 | ||
87 | pub fn get_active_operations_locked( | |
88 | name: &str, | |
89 | ) -> Result<(ActiveOperationStats, std::fs::File), Error> { | |
90 | let (data, lock) = get_active_operations_do(name, true)?; | |
91 | Ok((data, lock.unwrap())) | |
4bc84a65 HL |
92 | } |
93 | ||
94 | pub fn update_active_operations(name: &str, operation: Operation, count: i64) -> Result<(), Error> { | |
95 | let path = PathBuf::from(format!("{}/{}", crate::ACTIVE_OPERATIONS_DIR, name)); | |
4bc84a65 | 96 | |
857f346c | 97 | let (_lock, options) = open_lock_file(name)?; |
4bc84a65 HL |
98 | |
99 | let pid = std::process::id(); | |
100 | let starttime = procfs::PidStat::read_from_pid(Pid::from_raw(pid as pid_t))?.starttime; | |
101 | let mut updated = false; | |
102 | ||
103 | let mut updated_tasks: Vec<TaskOperations> = match file_read_optional_string(&path)? { | |
104 | Some(data) => serde_json::from_str::<Vec<TaskOperations>>(&data)? | |
105 | .iter_mut() | |
106 | .filter_map( | |
107 | |task| match procfs::check_process_running(task.pid as pid_t) { | |
108 | Some(stat) if pid == task.pid && stat.starttime != task.starttime => None, | |
109 | Some(_) => { | |
110 | if pid == task.pid { | |
111 | updated = true; | |
112 | match operation { | |
5fd823c3 HL |
113 | Operation::Read => task.active_operations.read += count, |
114 | Operation::Write => task.active_operations.write += count, | |
0408f60b | 115 | Operation::Lookup => (), // no IO must happen there |
4bc84a65 HL |
116 | }; |
117 | } | |
118 | Some(task.clone()) | |
119 | } | |
120 | _ => None, | |
121 | }, | |
122 | ) | |
123 | .collect(), | |
124 | None => Vec::new(), | |
125 | }; | |
126 | ||
127 | if !updated { | |
5fd823c3 HL |
128 | updated_tasks.push(TaskOperations { |
129 | pid, | |
130 | starttime, | |
131 | active_operations: match operation { | |
132 | Operation::Read => ActiveOperationStats { read: 1, write: 0 }, | |
133 | Operation::Write => ActiveOperationStats { read: 0, write: 1 }, | |
0408f60b | 134 | Operation::Lookup => ActiveOperationStats { read: 0, write: 0 }, |
4bc84a65 HL |
135 | }, |
136 | }) | |
137 | } | |
138 | replace_file( | |
139 | &path, | |
140 | serde_json::to_string(&updated_tasks)?.as_bytes(), | |
141 | options, | |
142 | false, | |
143 | ) | |
144 | } |