5 use std
::path
::PathBuf
;
7 use pbs_api_types
::Operation
;
8 use proxmox_sys
::fs
::{file_read_optional_string, open_file_locked, replace_file, CreateOptions}
;
9 use proxmox_sys
::linux
::procfs
;
10 use serde
::{Deserialize, Serialize}
;
12 #[derive(Deserialize, Serialize, Clone, Copy, Default)]
13 pub struct ActiveOperationStats
{
18 impl Sum
<Self> for ActiveOperationStats
{
19 fn sum
<I
>(iter
: I
) -> Self
21 I
: Iterator
<Item
= Self>,
23 iter
.fold(Self::default(), |a
, b
| Self {
24 read
: a
.read
+ b
.read
,
25 write
: a
.write
+ b
.write
,
30 #[derive(Deserialize, Serialize, Clone)]
31 struct TaskOperations
{
34 active_operations
: ActiveOperationStats
,
37 fn open_lock_file(name
: &str) -> Result
<(std
::fs
::File
, CreateOptions
), Error
> {
38 let user
= pbs_config
::backup_user()?
;
40 let lock_path
= PathBuf
::from(format
!("{}/{}.lock", crate::ACTIVE_OPERATIONS_DIR
, name
));
42 let options
= CreateOptions
::new()
45 .perm(nix
::sys
::stat
::Mode
::from_bits_truncate(0o660));
47 let timeout
= std
::time
::Duration
::new(10, 0);
50 open_file_locked(lock_path
, timeout
, true, options
.clone())?
,
55 /// MUST return `Some(file)` when `lock` is `true`.
56 fn get_active_operations_do(
59 ) -> Result
<(ActiveOperationStats
, Option
<std
::fs
::File
>), Error
> {
60 let path
= PathBuf
::from(format
!("{}/{}", crate::ACTIVE_OPERATIONS_DIR
, name
));
62 Some(open_lock_file(name
)?
.0)
67 let data
= match file_read_optional_string(path
)?
{
68 Some(data
) => serde_json
::from_str
::<Vec
<TaskOperations
>>(&data
)?
71 |task
| match procfs
::check_process_running(task
.pid
as pid_t
) {
72 Some(stat
) if task
.starttime
== stat
.starttime
=> Some(task
.active_operations
),
77 None
=> ActiveOperationStats
::default(),
83 pub fn get_active_operations(name
: &str) -> Result
<ActiveOperationStats
, Error
> {
84 Ok(get_active_operations_do(name
, false)?
.0)
87 pub fn get_active_operations_locked(
89 ) -> Result
<(ActiveOperationStats
, std
::fs
::File
), Error
> {
90 let (data
, lock
) = get_active_operations_do(name
, true)?
;
91 Ok((data
, lock
.unwrap()))
94 pub fn update_active_operations(name
: &str, operation
: Operation
, count
: i64) -> Result
<(), Error
> {
95 let path
= PathBuf
::from(format
!("{}/{}", crate::ACTIVE_OPERATIONS_DIR
, name
));
97 let (_lock
, options
) = open_lock_file(name
)?
;
99 let pid
= std
::process
::id();
100 let starttime
= procfs
::PidStat
::read_from_pid(Pid
::from_raw(pid
as pid_t
))?
.starttime
;
101 let mut updated
= false;
103 let mut updated_tasks
: Vec
<TaskOperations
> = match file_read_optional_string(&path
)?
{
104 Some(data
) => serde_json
::from_str
::<Vec
<TaskOperations
>>(&data
)?
107 |task
| match procfs
::check_process_running(task
.pid
as pid_t
) {
108 Some(stat
) if pid
== task
.pid
&& stat
.starttime
!= task
.starttime
=> None
,
113 Operation
::Read
=> task
.active_operations
.read
+= count
,
114 Operation
::Write
=> task
.active_operations
.write
+= count
,
115 Operation
::Lookup
=> (), // no IO must happen there
128 updated_tasks
.push(TaskOperations
{
131 active_operations
: match operation
{
132 Operation
::Read
=> ActiveOperationStats { read: 1, write: 0 }
,
133 Operation
::Write
=> ActiveOperationStats { read: 0, write: 1 }
,
134 Operation
::Lookup
=> ActiveOperationStats { read: 0, write: 0 }
,
140 serde_json
::to_string(&updated_tasks
)?
.as_bytes(),