1 use std
::collections
::{HashMap, VecDeque}
;
4 use std
::io
::{Read, Write, BufRead, BufReader}
;
5 use std
::panic
::UnwindSafe
;
6 use std
::sync
::atomic
::{AtomicBool, Ordering}
;
7 use std
::sync
::{Arc, Mutex}
;
9 use anyhow
::{bail, format_err, Error}
;
11 use lazy_static
::lazy_static
;
13 use serde_json
::{json, Value}
;
14 use serde
::{Serialize, Deserialize}
;
15 use tokio
::sync
::oneshot
;
17 use proxmox
::sys
::linux
::procfs
;
18 use proxmox
::try_block
;
19 use proxmox
::tools
::fs
::{create_path, open_file_locked, replace_file, CreateOptions}
;
23 use crate::tools
::logrotate
::{LogRotate, LogRotateFiles}
;
24 use crate::tools
::{FileLogger, FileLogOptions}
;
25 use crate::api2
::types
::Userid
;
27 macro_rules
! PROXMOX_BACKUP_VAR_RUN_DIR_M { () => ("/run/proxmox-backup") }
28 macro_rules
! PROXMOX_BACKUP_LOG_DIR_M { () => ("/var/log/proxmox-backup") }
29 macro_rules
! PROXMOX_BACKUP_TASK_DIR_M { () => (concat!( PROXMOX_BACKUP_LOG_DIR_M!(), "/tasks")) }
31 pub const PROXMOX_BACKUP_VAR_RUN_DIR
: &str = PROXMOX_BACKUP_VAR_RUN_DIR_M
!();
32 pub const PROXMOX_BACKUP_LOG_DIR
: &str = PROXMOX_BACKUP_LOG_DIR_M
!();
33 pub const PROXMOX_BACKUP_TASK_DIR
: &str = PROXMOX_BACKUP_TASK_DIR_M
!();
34 pub const PROXMOX_BACKUP_TASK_LOCK_FN
: &str = concat
!(PROXMOX_BACKUP_TASK_DIR_M
!(), "/.active.lock");
35 pub const PROXMOX_BACKUP_ACTIVE_TASK_FN
: &str = concat
!(PROXMOX_BACKUP_TASK_DIR_M
!(), "/active");
36 pub const PROXMOX_BACKUP_INDEX_TASK_FN
: &str = concat
!(PROXMOX_BACKUP_TASK_DIR_M
!(), "/index");
37 pub const PROXMOX_BACKUP_ARCHIVE_TASK_FN
: &str = concat
!(PROXMOX_BACKUP_TASK_DIR_M
!(), "/archive");
39 const MAX_INDEX_TASKS
: usize = 1000;
42 static ref WORKER_TASK_LIST
: Mutex
<HashMap
<usize, Arc
<WorkerTask
>>> = Mutex
::new(HashMap
::new());
44 static ref MY_PID
: i32 = unsafe { libc::getpid() }
;
45 static ref MY_PID_PSTART
: u64 = procfs
::PidStat
::read_from_pid(Pid
::from_raw(*MY_PID
))
50 /// Test if the task is still running
51 pub async
fn worker_is_active(upid
: &UPID
) -> Result
<bool
, Error
> {
52 if (upid
.pid
== *MY_PID
) && (upid
.pstart
== *MY_PID_PSTART
) {
53 return Ok(WORKER_TASK_LIST
.lock().unwrap().contains_key(&upid
.task_id
));
56 if !procfs
::check_process_running_pstart(upid
.pid
, upid
.pstart
).is_some() {
60 let socketname
= format
!(
61 "\0{}/proxmox-task-control-{}.sock", PROXMOX_BACKUP_VAR_RUN_DIR
, upid
.pid
);
65 "upid": upid
.to_string(),
68 let status
= super::send_command(socketname
, cmd
).await?
;
70 if let Some(active
) = status
.as_bool() {
73 bail
!("got unexpected result {:?} (expected bool)", status
);
77 /// Test if the task is still running (fast but inaccurate implementation)
79 /// If the task is spanned from a different process, we simply return if
80 /// that process is still running. This information is good enough to detect
82 pub fn worker_is_active_local(upid
: &UPID
) -> bool
{
83 if (upid
.pid
== *MY_PID
) && (upid
.pstart
== *MY_PID_PSTART
) {
84 WORKER_TASK_LIST
.lock().unwrap().contains_key(&upid
.task_id
)
86 procfs
::check_process_running_pstart(upid
.pid
, upid
.pstart
).is_some()
90 pub fn create_task_control_socket() -> Result
<(), Error
> {
92 let socketname
= format
!(
93 "\0{}/proxmox-task-control-{}.sock", PROXMOX_BACKUP_VAR_RUN_DIR
, *MY_PID
);
95 let control_future
= super::create_control_socket(socketname
, |param
| {
98 .ok_or_else(|| format_err
!("unable to parse parameters (expected json object)"))?
;
99 if param
.keys().count() != 2 { bail!("wrong number of parameters"); }
101 let command
= param
["command"]
103 .ok_or_else(|| format_err
!("unable to parse parameters (missing command)"))?
;
105 // we have only two commands for now
106 if !(command
== "abort-task" || command
== "status") {
107 bail
!("got unknown command '{}'", command
);
110 let upid_str
= param
["upid"]
112 .ok_or_else(|| format_err
!("unable to parse parameters (missing upid)"))?
;
114 let upid
= upid_str
.parse
::<UPID
>()?
;
116 if !(upid
.pid
== *MY_PID
&& upid
.pstart
== *MY_PID_PSTART
) {
117 bail
!("upid does not belong to this process");
120 let hash
= WORKER_TASK_LIST
.lock().unwrap();
124 if let Some(ref worker
) = hash
.get(&upid
.task_id
) {
125 worker
.request_abort();
127 // assume task is already stopped
132 let active
= hash
.contains_key(&upid
.task_id
);
136 bail
!("got unknown command '{}'", command
);
141 tokio
::spawn(control_future
);
146 pub fn abort_worker_async(upid
: UPID
) {
147 tokio
::spawn(async
move {
148 if let Err(err
) = abort_worker(upid
).await
{
149 eprintln
!("abort worker failed - {}", err
);
154 pub async
fn abort_worker(upid
: UPID
) -> Result
<(), Error
> {
156 let target_pid
= upid
.pid
;
158 let socketname
= format
!(
159 "\0{}/proxmox-task-control-{}.sock", PROXMOX_BACKUP_VAR_RUN_DIR
, target_pid
);
162 "command": "abort-task",
163 "upid": upid
.to_string(),
166 super::send_command(socketname
, cmd
).map_ok(|_
| ()).await
169 fn parse_worker_status_line(line
: &str) -> Result
<(String
, UPID
, Option
<TaskState
>), Error
> {
171 let data
= line
.splitn(3, ' '
).collect
::<Vec
<&str>>();
173 let len
= data
.len();
176 1 => Ok((data
[0].to_owned(), data
[0].parse
::<UPID
>()?
, None
)),
178 let endtime
= i64::from_str_radix(data
[1], 16)?
;
179 let state
= TaskState
::from_endtime_and_message(endtime
, data
[2])?
;
180 Ok((data
[0].to_owned(), data
[0].parse
::<UPID
>()?
, Some(state
)))
182 _
=> bail
!("wrong number of components"),
186 /// Create task log directory with correct permissions
187 pub fn create_task_log_dirs() -> Result
<(), Error
> {
190 let backup_user
= crate::backup
::backup_user()?
;
191 let opts
= CreateOptions
::new()
192 .owner(backup_user
.uid
)
193 .group(backup_user
.gid
);
195 create_path(PROXMOX_BACKUP_LOG_DIR
, None
, Some(opts
.clone()))?
;
196 create_path(PROXMOX_BACKUP_TASK_DIR
, None
, Some(opts
.clone()))?
;
197 create_path(PROXMOX_BACKUP_VAR_RUN_DIR
, None
, Some(opts
))?
;
199 }).map_err(|err
: Error
| format_err
!("unable to create task log dir - {}", err
))?
;
204 /// Read endtime (time of last log line) and exitstatus from task log file
205 /// If there is not a single line with at valid datetime, we assume the
206 /// starttime to be the endtime
207 pub fn upid_read_status(upid
: &UPID
) -> Result
<TaskState
, Error
> {
209 let mut status
= TaskState
::Unknown { endtime: upid.starttime }
;
211 let path
= upid
.log_path();
213 let mut file
= File
::open(path
)?
;
215 /// speedup - only read tail
217 use std
::io
::SeekFrom
;
218 let _
= file
.seek(SeekFrom
::End(-8192)); // ignore errors
220 let mut data
= Vec
::with_capacity(8192);
221 file
.read_to_end(&mut data
)?
;
223 // task logs should end with newline, we do not want it here
224 if data
.len() > 0 && data
[data
.len()-1] == b'
\n'
{
230 for pos
in (0..data
.len()).rev() {
231 if data
[pos
] == b'
\n'
{
232 start
= data
.len().min(pos
+ 1);
239 let last_line
= std
::str::from_utf8(last_line
)
240 .map_err(|err
| format_err
!("upid_read_status: utf8 parse failed: {}", err
))?
;
242 let mut iter
= last_line
.splitn(2, ": ");
243 if let Some(time_str
) = iter
.next() {
244 if let Ok(endtime
) = proxmox
::tools
::time
::parse_rfc3339(time_str
) {
245 if let Some(rest
) = iter
.next().and_then(|rest
| rest
.strip_prefix("TASK ")) {
246 if let Ok(state
) = TaskState
::from_endtime_and_message(endtime
, rest
) {
257 #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
259 /// The Task ended with an undefined state
260 Unknown { endtime: i64 }
,
261 /// The Task ended and there were no errors or warnings
263 /// The Task had 'count' amount of warnings and no errors
264 Warning { count: u64, endtime: i64 }
,
265 /// The Task ended with the error described in 'message'
266 Error { message: String, endtime: i64 }
,
270 pub fn endtime(&self) -> i64 {
272 TaskState
::Unknown { endtime }
=> endtime
,
273 TaskState
::OK { endtime }
=> endtime
,
274 TaskState
::Warning { endtime, .. }
=> endtime
,
275 TaskState
::Error { endtime, .. }
=> endtime
,
279 fn result_text(&self) -> String
{
281 TaskState
::Error { message, .. }
=> format
!("TASK ERROR: {}", message
),
282 other
=> format
!("TASK {}", other
),
286 fn from_endtime_and_message(endtime
: i64, s
: &str) -> Result
<Self, Error
> {
288 Ok(TaskState
::Unknown { endtime }
)
289 } else if s
== "OK" {
290 Ok(TaskState
::OK { endtime }
)
291 } else if s
.starts_with("WARNINGS: ") {
292 let count
: u64 = s
[10..].parse()?
;
293 Ok(TaskState
::Warning{ count, endtime }
)
294 } else if s
.len() > 0 {
295 let message
= if s
.starts_with("ERROR: ") { &s[7..] }
else { s }
.to_string();
296 Ok(TaskState
::Error{ message, endtime }
)
298 bail
!("unable to parse Task Status '{}'", s
);
303 impl std
::cmp
::PartialOrd
for TaskState
{
304 fn partial_cmp(&self, other
: &Self) -> Option
<std
::cmp
::Ordering
> {
305 Some(self.endtime().cmp(&other
.endtime()))
309 impl std
::cmp
::Ord
for TaskState
{
310 fn cmp(&self, other
: &Self) -> std
::cmp
::Ordering
{
311 self.endtime().cmp(&other
.endtime())
315 impl std
::fmt
::Display
for TaskState
{
316 fn fmt(&self, f
: &mut std
::fmt
::Formatter
<'_
>) -> std
::fmt
::Result
{
318 TaskState
::Unknown { .. }
=> write
!(f
, "unknown"),
319 TaskState
::OK { .. }
=> write
!(f
, "OK"),
320 TaskState
::Warning { count, .. }
=> write
!(f
, "WARNINGS: {}", count
),
321 TaskState
::Error { message, .. }
=> write
!(f
, "{}", message
),
326 /// Task details including parsed UPID
328 /// If there is no `state`, the task is still running.
330 pub struct TaskListInfo
{
333 /// UPID string representation
334 pub upid_str
: String
,
335 /// Task `(endtime, status)` if already finished
336 pub state
: Option
<TaskState
>, // endtime, status
339 fn lock_task_list_files(exclusive
: bool
) -> Result
<std
::fs
::File
, Error
> {
340 let backup_user
= crate::backup
::backup_user()?
;
342 let lock
= open_file_locked(PROXMOX_BACKUP_TASK_LOCK_FN
, std
::time
::Duration
::new(10, 0), exclusive
)?
;
343 nix
::unistd
::chown(PROXMOX_BACKUP_TASK_LOCK_FN
, Some(backup_user
.uid
), Some(backup_user
.gid
))?
;
348 /// checks if the Task Archive is bigger that 'size_threshold' bytes, and
349 /// rotates it if it is
350 pub fn rotate_task_log_archive(size_threshold
: u64, compress
: bool
, max_files
: Option
<usize>) -> Result
<bool
, Error
> {
351 let _lock
= lock_task_list_files(true)?
;
352 let path
= Path
::new(PROXMOX_BACKUP_ARCHIVE_TASK_FN
);
353 let metadata
= match path
.metadata() {
354 Ok(metadata
) => metadata
,
355 Err(err
) if err
.kind() == std
::io
::ErrorKind
::NotFound
=> return Ok(false),
356 Err(err
) => bail
!("unable to open task archive - {}", err
),
359 if metadata
.len() > size_threshold
{
360 let mut logrotate
= LogRotate
::new(PROXMOX_BACKUP_ARCHIVE_TASK_FN
, compress
).ok_or_else(|| format_err
!("could not get archive file names"))?
;
361 let backup_user
= crate::backup
::backup_user()?
;
364 .owner(backup_user
.uid
)
365 .group(backup_user
.gid
),
374 // atomically read/update the task list, update status of finished tasks
375 // new_upid is added to the list when specified.
376 fn update_active_workers(new_upid
: Option
<&UPID
>) -> Result
<(), Error
> {
378 let backup_user
= crate::backup
::backup_user()?
;
380 let lock
= lock_task_list_files(true)?
;
382 let mut finish_list
: Vec
<TaskListInfo
> = read_task_file_from_path(PROXMOX_BACKUP_INDEX_TASK_FN
)?
;
383 let mut active_list
: Vec
<TaskListInfo
> = read_task_file_from_path(PROXMOX_BACKUP_ACTIVE_TASK_FN
)?
386 if info
.state
.is_some() {
387 // this can happen when the active file still includes finished tasks
388 finish_list
.push(info
);
392 if !worker_is_active_local(&info
.upid
) {
393 println
!("Detected stopped UPID {}", &info
.upid_str
);
394 let now
= proxmox
::tools
::time
::epoch_i64();
395 let status
= upid_read_status(&info
.upid
)
396 .unwrap_or_else(|_
| TaskState
::Unknown { endtime: now }
);
397 finish_list
.push(TaskListInfo
{
399 upid_str
: info
.upid_str
,
408 if let Some(upid
) = new_upid
{
409 active_list
.push(TaskListInfo { upid: upid.clone(), upid_str: upid.to_string(), state: None }
);
412 let active_raw
= render_task_list(&active_list
);
415 PROXMOX_BACKUP_ACTIVE_TASK_FN
,
416 active_raw
.as_bytes(),
418 .owner(backup_user
.uid
)
419 .group(backup_user
.gid
),
422 finish_list
.sort_unstable_by(|a
, b
| {
423 match (&a
.state
, &b
.state
) {
424 (Some(s1
), Some(s2
)) => s1
.cmp(&s2
),
425 (Some(_
), None
) => std
::cmp
::Ordering
::Less
,
426 (None
, Some(_
)) => std
::cmp
::Ordering
::Greater
,
427 _
=> a
.upid
.starttime
.cmp(&b
.upid
.starttime
),
432 let start
= if finish_list
.len() > MAX_INDEX_TASKS
{
433 finish_list
.len() - MAX_INDEX_TASKS
438 let end
= (start
+MAX_INDEX_TASKS
).min(finish_list
.len());
440 let index_raw
= if end
> start
{
441 render_task_list(&finish_list
[start
..end
])
447 PROXMOX_BACKUP_INDEX_TASK_FN
,
448 index_raw
.as_bytes(),
450 .owner(backup_user
.uid
)
451 .group(backup_user
.gid
),
454 if !finish_list
.is_empty() && start
> 0 {
455 match std
::fs
::OpenOptions
::new().append(true).create(true).open(PROXMOX_BACKUP_ARCHIVE_TASK_FN
) {
457 for info
in &finish_list
[0..start
] {
458 writer
.write_all(render_task_line(&info
).as_bytes())?
;
461 Err(err
) => bail
!("could not write task archive - {}", err
),
464 nix
::unistd
::chown(PROXMOX_BACKUP_ARCHIVE_TASK_FN
, Some(backup_user
.uid
), Some(backup_user
.gid
))?
;
472 fn render_task_line(info
: &TaskListInfo
) -> String
{
473 let mut raw
= String
::new();
474 if let Some(status
) = &info
.state
{
475 raw
.push_str(&format
!("{} {:08X} {}\n", info
.upid_str
, status
.endtime(), status
));
477 raw
.push_str(&info
.upid_str
);
484 fn render_task_list(list
: &[TaskListInfo
]) -> String
{
485 let mut raw
= String
::new();
487 raw
.push_str(&render_task_line(&info
));
492 // note this is not locked, caller has to make sure it is
493 // this will skip (and log) lines that are not valid status lines
494 fn read_task_file
<R
: Read
>(reader
: R
) -> Result
<Vec
<TaskListInfo
>, Error
>
496 let reader
= BufReader
::new(reader
);
497 let mut list
= Vec
::new();
498 for line
in reader
.lines() {
500 match parse_worker_status_line(&line
) {
501 Ok((upid_str
, upid
, state
)) => list
.push(TaskListInfo
{
507 eprintln
!("unable to parse worker status '{}' - {}", line
, err
);
516 // note this is not locked, caller has to make sure it is
517 fn read_task_file_from_path
<P
>(path
: P
) -> Result
<Vec
<TaskListInfo
>, Error
>
519 P
: AsRef
<std
::path
::Path
> + std
::fmt
::Debug
,
521 let file
= match File
::open(&path
) {
523 Err(err
) if err
.kind() == std
::io
::ErrorKind
::NotFound
=> return Ok(Vec
::new()),
524 Err(err
) => bail
!("unable to open task list {:?} - {}", path
, err
),
537 pub struct TaskListInfoIterator
{
538 list
: VecDeque
<TaskListInfo
>,
540 archive
: Option
<LogRotateFiles
>,
544 impl TaskListInfoIterator
{
545 pub fn new(active_only
: bool
) -> Result
<Self, Error
> {
546 let (read_lock
, active_list
) = {
547 let lock
= lock_task_list_files(false)?
;
548 let active_list
= read_task_file_from_path(PROXMOX_BACKUP_ACTIVE_TASK_FN
)?
;
550 let needs_update
= active_list
552 .any(|info
| info
.state
.is_some() || !worker_is_active_local(&info
.upid
));
556 update_active_workers(None
)?
;
557 let lock
= lock_task_list_files(false)?
;
558 let active_list
= read_task_file_from_path(PROXMOX_BACKUP_ACTIVE_TASK_FN
)?
;
565 let archive
= if active_only
{
568 let logrotate
= LogRotate
::new(PROXMOX_BACKUP_ARCHIVE_TASK_FN
, true)
569 .ok_or_else(|| format_err
!("could not get archive file names"))?
;
570 Some(logrotate
.files())
573 let file
= if active_only { TaskFile::End }
else { TaskFile::Active }
;
574 let lock
= if active_only { None }
else { Some(read_lock) }
;
577 list
: active_list
.into(),
585 impl Iterator
for TaskListInfoIterator
{
586 type Item
= Result
<TaskListInfo
, Error
>;
588 fn next(&mut self) -> Option
<Self::Item
> {
590 if let Some(element
) = self.list
.pop_back() {
591 return Some(Ok(element
));
594 TaskFile
::Active
=> {
595 let index
= match read_task_file_from_path(PROXMOX_BACKUP_INDEX_TASK_FN
) {
597 Err(err
) => return Some(Err(err
)),
599 self.list
.append(&mut index
.into());
600 self.file
= TaskFile
::Index
;
602 TaskFile
::Index
| TaskFile
::Archive
=> {
603 if let Some(mut archive
) = self.archive
.take() {
604 if let Some(file
) = archive
.next() {
605 let list
= match read_task_file(file
) {
607 Err(err
) => return Some(Err(err
)),
609 self.list
.append(&mut list
.into());
610 self.archive
= Some(archive
);
611 self.file
= TaskFile
::Archive
;
615 self.file
= TaskFile
::End
;
619 TaskFile
::End
=> return None
,
626 /// Launch long running worker tasks.
628 /// A worker task can either be a whole thread, or a simply tokio
629 /// task/future. Each task can `log()` messages, which are stored
630 /// persistently to files. Task should poll the `abort_requested`
631 /// flag, and stop execution when requested.
633 pub struct WorkerTask
{
635 data
: Mutex
<WorkerTaskData
>,
636 abort_requested
: AtomicBool
,
639 impl std
::fmt
::Display
for WorkerTask
{
641 fn fmt(&self, f
: &mut std
::fmt
::Formatter
) -> std
::fmt
::Result
{
647 struct WorkerTaskData
{
649 progress
: f64, // 0..1
651 pub abort_listeners
: Vec
<oneshot
::Sender
<()>>,
654 impl Drop
for WorkerTask
{
657 println
!("unregister worker");
663 pub fn new(worker_type
: &str, worker_id
: Option
<String
>, userid
: Userid
, to_stdout
: bool
) -> Result
<Arc
<Self>, Error
> {
664 println
!("register worker");
666 let upid
= UPID
::new(worker_type
, worker_id
, userid
)?
;
667 let task_id
= upid
.task_id
;
669 let mut path
= std
::path
::PathBuf
::from(PROXMOX_BACKUP_TASK_DIR
);
671 path
.push(format
!("{:02X}", upid
.pstart
% 256));
673 let backup_user
= crate::backup
::backup_user()?
;
675 create_path(&path
, None
, Some(CreateOptions
::new().owner(backup_user
.uid
).group(backup_user
.gid
)))?
;
677 path
.push(upid
.to_string());
679 println
!("FILE: {:?}", path
);
681 let logger_options
= FileLogOptions
{
682 to_stdout
: to_stdout
,
688 let logger
= FileLogger
::new(&path
, logger_options
)?
;
689 nix
::unistd
::chown(&path
, Some(backup_user
.uid
), Some(backup_user
.gid
))?
;
691 let worker
= Arc
::new(Self {
693 abort_requested
: AtomicBool
::new(false),
694 data
: Mutex
::new(WorkerTaskData
{
698 abort_listeners
: vec
![],
702 // scope to drop the lock again after inserting
704 let mut hash
= WORKER_TASK_LIST
.lock().unwrap();
705 hash
.insert(task_id
, worker
.clone());
706 super::set_worker_count(hash
.len());
709 update_active_workers(Some(&upid
))?
;
714 /// Spawn a new tokio task/future.
717 worker_id
: Option
<String
>,
721 ) -> Result
<String
, Error
>
722 where F
: Send
+ '
static + FnOnce(Arc
<WorkerTask
>) -> T
,
723 T
: Send
+ '
static + Future
<Output
= Result
<(), Error
>>,
725 let worker
= WorkerTask
::new(worker_type
, worker_id
, userid
, to_stdout
)?
;
726 let upid_str
= worker
.upid
.to_string();
727 let f
= f(worker
.clone());
728 tokio
::spawn(async
move {
729 let result
= f
.await
;
730 worker
.log_result(&result
);
736 /// Create a new worker thread.
737 pub fn new_thread
<F
>(
739 worker_id
: Option
<String
>,
743 ) -> Result
<String
, Error
>
744 where F
: Send
+ UnwindSafe
+ '
static + FnOnce(Arc
<WorkerTask
>) -> Result
<(), Error
>
746 println
!("register worker thread");
748 let worker
= WorkerTask
::new(worker_type
, worker_id
, userid
, to_stdout
)?
;
749 let upid_str
= worker
.upid
.to_string();
751 let _child
= std
::thread
::Builder
::new().name(upid_str
.clone()).spawn(move || {
752 let worker1
= worker
.clone();
753 let result
= match std
::panic
::catch_unwind(move || f(worker1
)) {
756 match panic
.downcast
::<&str>() {
758 Err(format_err
!("worker panicked: {}", panic_msg
))
761 Err(format_err
!("worker panicked: unknown type."))
767 worker
.log_result(&result
);
773 /// create state from self and a result
774 pub fn create_state(&self, result
: &Result
<(), Error
>) -> TaskState
{
775 let warn_count
= self.data
.lock().unwrap().warn_count
;
777 let endtime
= proxmox
::tools
::time
::epoch_i64();
779 if let Err(err
) = result
{
780 TaskState
::Error { message: err.to_string(), endtime }
781 } else if warn_count
> 0 {
782 TaskState
::Warning { count: warn_count, endtime }
784 TaskState
::OK { endtime }
788 /// Log task result, remove task from running list
789 pub fn log_result(&self, result
: &Result
<(), Error
>) {
790 let state
= self.create_state(result
);
791 self.log(state
.result_text());
793 WORKER_TASK_LIST
.lock().unwrap().remove(&self.upid
.task_id
);
794 let _
= update_active_workers(None
);
795 super::set_worker_count(WORKER_TASK_LIST
.lock().unwrap().len());
799 pub fn log
<S
: AsRef
<str>>(&self, msg
: S
) {
800 let mut data
= self.data
.lock().unwrap();
801 data
.logger
.log(msg
);
804 /// Log a message as warning.
805 pub fn warn
<S
: AsRef
<str>>(&self, msg
: S
) {
806 let mut data
= self.data
.lock().unwrap();
807 data
.logger
.log(format
!("WARN: {}", msg
.as_ref()));
808 data
.warn_count
+= 1;
811 /// Set progress indicator
812 pub fn progress(&self, progress
: f64) {
813 if progress
>= 0.0 && progress
<= 1.0 {
814 let mut data
= self.data
.lock().unwrap();
815 data
.progress
= progress
;
817 // fixme: log!("task '{}': ignoring strange value for progress '{}'", self.upid, progress);
822 pub fn request_abort(&self) {
823 eprintln
!("set abort flag for worker {}", self.upid
);
824 self.abort_requested
.store(true, Ordering
::SeqCst
);
826 let mut data
= self.data
.lock().unwrap();
828 match data
.abort_listeners
.pop() {
831 let _
= ch
.send(()); // ignore erros here
837 /// Test if abort was requested.
838 pub fn abort_requested(&self) -> bool
{
839 self.abort_requested
.load(Ordering
::SeqCst
)
842 /// Fail if abort was requested.
843 pub fn fail_on_abort(&self) -> Result
<(), Error
> {
844 if self.abort_requested() {
845 bail
!("abort requested - aborting task");
850 /// Get a future which resolves on task abort
851 pub fn abort_future(&self) -> oneshot
::Receiver
<()> {
852 let (tx
, rx
) = oneshot
::channel
::<()>();
854 let mut data
= self.data
.lock().unwrap();
855 if self.abort_requested() {
858 data
.abort_listeners
.push(tx
);
863 pub fn upid(&self) -> &UPID
{
868 impl crate::task
::TaskState
for WorkerTask
{
869 fn check_abort(&self) -> Result
<(), Error
> {
873 fn log(&self, level
: log
::Level
, message
: &std
::fmt
::Arguments
) {
875 log
::Level
::Error
=> self.warn(&message
.to_string()),
876 log
::Level
::Warn
=> self.warn(&message
.to_string()),
877 log
::Level
::Info
=> self.log(&message
.to_string()),
878 log
::Level
::Debug
=> self.log(&format
!("DEBUG: {}", message
)),
879 log
::Level
::Trace
=> self.log(&format
!("TRACE: {}", message
)),