1 use std
::collections
::{HashMap, VecDeque}
;
3 use std
::path
::PathBuf
;
4 use std
::io
::{Read, Write, BufRead, BufReader}
;
5 use std
::panic
::UnwindSafe
;
6 use std
::sync
::atomic
::{AtomicBool, Ordering}
;
7 use std
::sync
::{Arc, Mutex}
;
8 use std
::time
::{SystemTime, Duration}
;
10 use anyhow
::{bail, format_err, Error}
;
12 use lazy_static
::lazy_static
;
13 use serde_json
::{json, Value}
;
14 use serde
::{Serialize, Deserialize}
;
15 use tokio
::sync
::oneshot
;
16 use nix
::fcntl
::OFlag
;
17 use once_cell
::sync
::OnceCell
;
19 use proxmox
::sys
::linux
::procfs
;
20 use proxmox
::tools
::fs
::{create_path, replace_file, atomic_open_or_create_file, CreateOptions}
;
21 use proxmox_lang
::try_block
;
22 use proxmox_schema
::upid
::UPID
;
24 use pbs_tools
::task
::WorkerTaskContext
;
25 use pbs_tools
::logrotate
::{LogRotate, LogRotateFiles}
;
27 use crate::{CommandSocket, FileLogger, FileLogOptions}
;
29 struct TaskListLockGuard(File
);
31 struct WorkerTaskSetup
{
32 file_opts
: CreateOptions
,
34 task_lock_fn
: PathBuf
,
35 active_tasks_fn
: PathBuf
,
36 task_index_fn
: PathBuf
,
37 task_archive_fn
: PathBuf
,
40 static WORKER_TASK_SETUP
: OnceCell
<WorkerTaskSetup
> = OnceCell
::new();
42 fn worker_task_setup() -> Result
<&'
static WorkerTaskSetup
, Error
> {
43 WORKER_TASK_SETUP
.get()
44 .ok_or_else(|| format_err
!("WorkerTask library is not initialized"))
47 impl WorkerTaskSetup
{
49 fn new(basedir
: PathBuf
, file_opts
: CreateOptions
) -> Self {
51 let mut taskdir
= basedir
.clone();
52 taskdir
.push("tasks");
54 let mut task_lock_fn
= taskdir
.clone();
55 task_lock_fn
.push(".active.lock");
57 let mut active_tasks_fn
= taskdir
.clone();
58 active_tasks_fn
.push("active");
60 let mut task_index_fn
= taskdir
.clone();
61 task_index_fn
.push("index");
63 let mut task_archive_fn
= taskdir
.clone();
64 task_archive_fn
.push("archive");
76 fn lock_task_list_files(&self, exclusive
: bool
) -> Result
<TaskListLockGuard
, Error
> {
77 let options
= self.file_opts
.clone()
78 .perm(nix
::sys
::stat
::Mode
::from_bits_truncate(0o660));
80 let timeout
= std
::time
::Duration
::new(10, 0);
82 let file
= proxmox
::tools
::fs
::open_file_locked(
89 Ok(TaskListLockGuard(file
))
92 fn log_path(&self, upid
: &UPID
) -> std
::path
::PathBuf
{
93 let mut path
= self.taskdir
.clone();
94 path
.push(format
!("{:02X}", upid
.pstart
% 256));
95 path
.push(upid
.to_string());
99 // atomically read/update the task list, update status of finished tasks
100 // new_upid is added to the list when specified.
101 fn update_active_workers(&self, new_upid
: Option
<&UPID
>) -> Result
<(), Error
> {
103 let lock
= self.lock_task_list_files(true)?
;
105 // TODO remove with 1.x
106 let mut finish_list
: Vec
<TaskListInfo
> = read_task_file_from_path(&self.task_index_fn
)?
;
107 let had_index_file
= !finish_list
.is_empty();
109 // We use filter_map because one negative case wants to *move* the data into `finish_list`,
110 // clippy doesn't quite catch this!
111 #[allow(clippy::unnecessary_filter_map)]
112 let mut active_list
: Vec
<TaskListInfo
> = read_task_file_from_path(&self.active_tasks_fn
)?
115 if info
.state
.is_some() {
116 // this can happen when the active file still includes finished tasks
117 finish_list
.push(info
);
121 if !worker_is_active_local(&info
.upid
) {
122 // println!("Detected stopped task '{}'", &info.upid_str);
123 let now
= proxmox_time
::epoch_i64();
124 let status
= upid_read_status(&info
.upid
).unwrap_or(TaskState
::Unknown { endtime: now }
);
125 finish_list
.push(TaskListInfo
{
127 upid_str
: info
.upid_str
,
136 if let Some(upid
) = new_upid
{
137 active_list
.push(TaskListInfo { upid: upid.clone(), upid_str: upid.to_string(), state: None }
);
140 let active_raw
= render_task_list(&active_list
);
142 let options
= self.file_opts
.clone()
143 .perm(nix
::sys
::stat
::Mode
::from_bits_truncate(0o660));
146 &self.active_tasks_fn
,
147 active_raw
.as_bytes(),
151 finish_list
.sort_unstable_by(|a
, b
| {
152 match (&a
.state
, &b
.state
) {
153 (Some(s1
), Some(s2
)) => s1
.cmp(&s2
),
154 (Some(_
), None
) => std
::cmp
::Ordering
::Less
,
155 (None
, Some(_
)) => std
::cmp
::Ordering
::Greater
,
156 _
=> a
.upid
.starttime
.cmp(&b
.upid
.starttime
),
160 if !finish_list
.is_empty() {
161 let options
= self.file_opts
.clone()
162 .perm(nix
::sys
::stat
::Mode
::from_bits_truncate(0o660));
164 let mut writer
= atomic_open_or_create_file(
165 &self.task_archive_fn
,
166 OFlag
::O_APPEND
| OFlag
::O_RDWR
,
170 for info
in &finish_list
{
171 writer
.write_all(render_task_line(&info
).as_bytes())?
;
175 // TODO Remove with 1.x
176 // for compatibility, if we had an INDEX file, we do not need it anymore
178 let _
= nix
::unistd
::unlink(&self.task_index_fn
);
186 // Create task log directory with correct permissions
187 fn create_task_log_dirs(&self) -> Result
<(), Error
> {
190 let dir_opts
= self.file_opts
.clone()
191 .perm(nix
::sys
::stat
::Mode
::from_bits_truncate(0o755));
193 create_path(&self.taskdir
, Some(dir_opts
.clone()), Some(dir_opts
.clone()))?
;
194 // fixme:??? create_path(pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR, None, Some(opts))?;
196 }).map_err(|err
: Error
| format_err
!("unable to create task log dir - {}", err
))
200 /// Initialize the WorkerTask library
201 pub fn init_worker_tasks(basedir
: PathBuf
, file_opts
: CreateOptions
) -> Result
<(), Error
> {
202 let setup
= WorkerTaskSetup
::new(basedir
, file_opts
);
203 setup
.create_task_log_dirs()?
;
204 WORKER_TASK_SETUP
.set(setup
)
205 .map_err(|_
| format_err
!("init_worker_tasks failed - already initialized"))
208 /// checks if the Task Archive is bigger that 'size_threshold' bytes, and
209 /// rotates it if it is
210 pub fn rotate_task_log_archive(size_threshold
: u64, compress
: bool
, max_files
: Option
<usize>) -> Result
<bool
, Error
> {
212 let setup
= worker_task_setup()?
;
214 let _lock
= setup
.lock_task_list_files(true)?
;
216 let mut logrotate
= LogRotate
::new(&setup
.task_archive_fn
, compress
)
217 .ok_or_else(|| format_err
!("could not get archive file names"))?
;
219 logrotate
.rotate(size_threshold
, None
, max_files
)
222 /// removes all task logs that are older than the oldest task entry in the
224 pub fn cleanup_old_tasks(compressed
: bool
) -> Result
<(), Error
> {
225 let setup
= worker_task_setup()?
;
227 let _lock
= setup
.lock_task_list_files(true)?
;
229 let logrotate
= LogRotate
::new(&setup
.task_archive_fn
, compressed
)
230 .ok_or_else(|| format_err
!("could not get archive file names"))?
;
232 let mut timestamp
= None
;
233 if let Some(last_file
) = logrotate
.files().last() {
234 let reader
= BufReader
::new(last_file
);
235 for line
in reader
.lines() {
237 if let Ok((_
, _
, Some(state
))) = parse_worker_status_line(&line
) {
238 timestamp
= Some(state
.endtime());
244 fn get_modified(entry
: std
::fs
::DirEntry
) -> Result
<SystemTime
, std
::io
::Error
> {
245 entry
.metadata()?
.modified()
248 if let Some(timestamp
) = timestamp
{
249 let cutoff_time
= if timestamp
> 0 {
250 SystemTime
::UNIX_EPOCH
.checked_add(Duration
::from_secs(timestamp
as u64))
252 SystemTime
::UNIX_EPOCH
.checked_sub(Duration
::from_secs(-timestamp
as u64))
253 }.ok_or_else(|| format_err
!("could not calculate cutoff time"))?
;
256 let mut path
= setup
.taskdir
.clone();
257 path
.push(format
!("{:02X}", i
));
258 for file
in std
::fs
::read_dir(path
)?
{
260 let path
= file
.path();
262 let modified
= get_modified(file
)
263 .map_err(|err
| format_err
!("error getting mtime for {:?}: {}", path
, err
))?
;
265 if modified
< cutoff_time
{
266 match std
::fs
::remove_file(path
) {
268 Err(err
) if err
.kind() == std
::io
::ErrorKind
::NotFound
=> {}
,
269 Err(err
) => bail
!("could not remove file: {}", err
),
280 /// Path to the worker log file
281 pub fn upid_log_path(upid
: &UPID
) -> Result
<std
::path
::PathBuf
, Error
> {
282 let setup
= worker_task_setup()?
;
283 Ok(setup
.log_path(upid
))
286 /// Read endtime (time of last log line) and exitstatus from task log file
287 /// If there is not a single line with at valid datetime, we assume the
288 /// starttime to be the endtime
289 pub fn upid_read_status(upid
: &UPID
) -> Result
<TaskState
, Error
> {
291 let setup
= worker_task_setup()?
;
293 let mut status
= TaskState
::Unknown { endtime: upid.starttime }
;
295 let path
= setup
.log_path(upid
);
297 let mut file
= File
::open(path
)?
;
299 /// speedup - only read tail
301 use std
::io
::SeekFrom
;
302 let _
= file
.seek(SeekFrom
::End(-8192)); // ignore errors
304 let mut data
= Vec
::with_capacity(8192);
305 file
.read_to_end(&mut data
)?
;
307 // strip newlines at the end of the task logs
308 while data
.last() == Some(&b'
\n'
) {
312 let last_line
= match data
.iter().rposition(|c
| *c
== b'
\n'
) {
313 Some(start
) if data
.len() > (start
+1) => &data
[start
+1..],
314 Some(_
) => &data
, // should not happen, since we removed all trailing newlines
318 let last_line
= std
::str::from_utf8(last_line
)
319 .map_err(|err
| format_err
!("upid_read_status: utf8 parse failed: {}", err
))?
;
321 let mut iter
= last_line
.splitn(2, ": ");
322 if let Some(time_str
) = iter
.next() {
323 if let Ok(endtime
) = proxmox_time
::parse_rfc3339(time_str
) {
324 // set the endtime even if we cannot parse the state
325 status
= TaskState
::Unknown { endtime }
;
326 if let Some(rest
) = iter
.next().and_then(|rest
| rest
.strip_prefix("TASK ")) {
327 if let Ok(state
) = TaskState
::from_endtime_and_message(endtime
, rest
) {
338 static ref WORKER_TASK_LIST
: Mutex
<HashMap
<usize, Arc
<WorkerTask
>>> = Mutex
::new(HashMap
::new());
341 /// checks if the task UPID refers to a worker from this process
342 fn is_local_worker(upid
: &UPID
) -> bool
{
343 upid
.pid
== crate::pid() && upid
.pstart
== crate::pstart()
346 /// Test if the task is still running
347 pub async
fn worker_is_active(upid
: &UPID
) -> Result
<bool
, Error
> {
348 if is_local_worker(upid
) {
349 return Ok(WORKER_TASK_LIST
.lock().unwrap().contains_key(&upid
.task_id
));
352 if procfs
::check_process_running_pstart(upid
.pid
, upid
.pstart
).is_none() {
356 let sock
= crate::ctrl_sock_from_pid(upid
.pid
);
358 "command": "worker-task-status",
360 "upid": upid
.to_string(),
363 let status
= crate::send_command(sock
, &cmd
).await?
;
365 if let Some(active
) = status
.as_bool() {
368 bail
!("got unexpected result {:?} (expected bool)", status
);
372 /// Test if the task is still running (fast but inaccurate implementation)
374 /// If the task is spawned from a different process, we simply return if
375 /// that process is still running. This information is good enough to detect
377 pub fn worker_is_active_local(upid
: &UPID
) -> bool
{
378 if is_local_worker(upid
) {
379 WORKER_TASK_LIST
.lock().unwrap().contains_key(&upid
.task_id
)
381 procfs
::check_process_running_pstart(upid
.pid
, upid
.pstart
).is_some()
385 /// Register task control command on a [CommandSocket].
387 /// This create two commands:
389 /// * ``worker-task-abort <UPID>``: calls [abort_local_worker]
391 /// * ``worker-task-status <UPID>``: return true of false, depending on
392 /// whether the worker is running or stopped.
393 pub fn register_task_control_commands(
394 commando_sock
: &mut CommandSocket
,
395 ) -> Result
<(), Error
> {
396 fn get_upid(args
: Option
<&Value
>) -> Result
<UPID
, Error
> {
397 let args
= if let Some(args
) = args { args }
else { bail!("missing args") }
;
398 let upid
= match args
.get("upid") {
399 Some(Value
::String(upid
)) => upid
.parse
::<UPID
>()?
,
400 None
=> bail
!("no upid in args"),
401 _
=> bail
!("unable to parse upid"),
403 if !is_local_worker(&upid
) {
404 bail
!("upid does not belong to this process");
409 commando_sock
.register_command("worker-task-abort".into(), move |args
| {
410 let upid
= get_upid(args
)?
;
412 abort_local_worker(upid
);
416 commando_sock
.register_command("worker-task-status".into(), move |args
| {
417 let upid
= get_upid(args
)?
;
419 let active
= WORKER_TASK_LIST
.lock().unwrap().contains_key(&upid
.task_id
);
427 /// Try to abort a worker task, but do no wait
429 /// Errors (if any) are simply logged.
430 pub fn abort_worker_nowait(upid
: UPID
) {
431 tokio
::spawn(async
move {
432 if let Err(err
) = abort_worker(upid
).await
{
433 log
::error
!("abort worker task failed - {}", err
);
438 /// Abort a worker task
440 /// By sending ``worker-task-abort`` to the control socket.
441 pub async
fn abort_worker(upid
: UPID
) -> Result
<(), Error
> {
443 let sock
= crate::ctrl_sock_from_pid(upid
.pid
);
445 "command": "worker-task-abort",
447 "upid": upid
.to_string(),
450 crate::send_command(sock
, &cmd
).map_ok(|_
| ()).await
453 fn parse_worker_status_line(line
: &str) -> Result
<(String
, UPID
, Option
<TaskState
>), Error
> {
455 let data
= line
.splitn(3, ' '
).collect
::<Vec
<&str>>();
457 let len
= data
.len();
460 1 => Ok((data
[0].to_owned(), data
[0].parse
::<UPID
>()?
, None
)),
462 let endtime
= i64::from_str_radix(data
[1], 16)?
;
463 let state
= TaskState
::from_endtime_and_message(endtime
, data
[2])?
;
464 Ok((data
[0].to_owned(), data
[0].parse
::<UPID
>()?
, Some(state
)))
466 _
=> bail
!("wrong number of components"),
471 #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
473 /// The Task ended with an undefined state
474 Unknown { endtime: i64 }
,
475 /// The Task ended and there were no errors or warnings
477 /// The Task had 'count' amount of warnings and no errors
478 Warning { count: u64, endtime: i64 }
,
479 /// The Task ended with the error described in 'message'
480 Error { message: String, endtime: i64 }
,
484 pub fn endtime(&self) -> i64 {
486 TaskState
::Unknown { endtime }
=> endtime
,
487 TaskState
::OK { endtime }
=> endtime
,
488 TaskState
::Warning { endtime, .. }
=> endtime
,
489 TaskState
::Error { endtime, .. }
=> endtime
,
493 fn result_text(&self) -> String
{
495 TaskState
::Error { message, .. }
=> format
!("TASK ERROR: {}", message
),
496 other
=> format
!("TASK {}", other
),
500 fn from_endtime_and_message(endtime
: i64, s
: &str) -> Result
<Self, Error
> {
502 Ok(TaskState
::Unknown { endtime }
)
503 } else if s
== "OK" {
504 Ok(TaskState
::OK { endtime }
)
505 } else if let Some(warnings
) = s
.strip_prefix("WARNINGS: ") {
506 let count
: u64 = warnings
.parse()?
;
507 Ok(TaskState
::Warning{ count, endtime }
)
508 } else if !s
.is_empty() {
509 let message
= if let Some(err
) = s
.strip_prefix("ERROR: ") { err }
else { s }
.to_string();
510 Ok(TaskState
::Error{ message, endtime }
)
512 bail
!("unable to parse Task Status '{}'", s
);
517 impl std
::cmp
::PartialOrd
for TaskState
{
518 fn partial_cmp(&self, other
: &Self) -> Option
<std
::cmp
::Ordering
> {
519 Some(self.endtime().cmp(&other
.endtime()))
523 impl std
::cmp
::Ord
for TaskState
{
524 fn cmp(&self, other
: &Self) -> std
::cmp
::Ordering
{
525 self.endtime().cmp(&other
.endtime())
529 impl std
::fmt
::Display
for TaskState
{
530 fn fmt(&self, f
: &mut std
::fmt
::Formatter
<'_
>) -> std
::fmt
::Result
{
532 TaskState
::Unknown { .. }
=> write
!(f
, "unknown"),
533 TaskState
::OK { .. }
=> write
!(f
, "OK"),
534 TaskState
::Warning { count, .. }
=> write
!(f
, "WARNINGS: {}", count
),
535 TaskState
::Error { message, .. }
=> write
!(f
, "{}", message
),
540 /// Task details including parsed UPID
542 /// If there is no `state`, the task is still running.
544 pub struct TaskListInfo
{
547 /// UPID string representation
548 pub upid_str
: String
,
549 /// Task `(endtime, status)` if already finished
550 pub state
: Option
<TaskState
>, // endtime, status
553 fn render_task_line(info
: &TaskListInfo
) -> String
{
554 let mut raw
= String
::new();
555 if let Some(status
) = &info
.state
{
556 raw
.push_str(&format
!("{} {:08X} {}\n", info
.upid_str
, status
.endtime(), status
));
558 raw
.push_str(&info
.upid_str
);
565 fn render_task_list(list
: &[TaskListInfo
]) -> String
{
566 let mut raw
= String
::new();
568 raw
.push_str(&render_task_line(&info
));
573 // note this is not locked, caller has to make sure it is
574 // this will skip (and log) lines that are not valid status lines
575 fn read_task_file
<R
: Read
>(reader
: R
) -> Result
<Vec
<TaskListInfo
>, Error
>
577 let reader
= BufReader
::new(reader
);
578 let mut list
= Vec
::new();
579 for line
in reader
.lines() {
581 match parse_worker_status_line(&line
) {
582 Ok((upid_str
, upid
, state
)) => list
.push(TaskListInfo
{
588 log
::warn
!("unable to parse worker status '{}' - {}", line
, err
);
597 // note this is not locked, caller has to make sure it is
598 fn read_task_file_from_path
<P
>(path
: P
) -> Result
<Vec
<TaskListInfo
>, Error
>
600 P
: AsRef
<std
::path
::Path
> + std
::fmt
::Debug
,
602 let file
= match File
::open(&path
) {
604 Err(err
) if err
.kind() == std
::io
::ErrorKind
::NotFound
=> return Ok(Vec
::new()),
605 Err(err
) => bail
!("unable to open task list {:?} - {}", path
, err
),
611 /// Iterate over existing/active worker tasks
612 pub struct TaskListInfoIterator
{
613 list
: VecDeque
<TaskListInfo
>,
615 archive
: Option
<LogRotateFiles
>,
616 lock
: Option
<TaskListLockGuard
>,
619 impl TaskListInfoIterator
{
620 /// Creates a new iterator instance.
621 pub fn new(active_only
: bool
) -> Result
<Self, Error
> {
623 let setup
= worker_task_setup()?
;
625 let (read_lock
, active_list
) = {
626 let lock
= setup
.lock_task_list_files(false)?
;
627 let active_list
= read_task_file_from_path(&setup
.active_tasks_fn
)?
;
629 let needs_update
= active_list
631 .any(|info
| info
.state
.is_some() || !worker_is_active_local(&info
.upid
));
633 // TODO remove with 1.x
634 let index_exists
= setup
.task_index_fn
.is_file();
636 if needs_update
|| index_exists
{
638 setup
.update_active_workers(None
)?
;
639 let lock
= setup
.lock_task_list_files(false)?
;
640 let active_list
= read_task_file_from_path(&setup
.active_tasks_fn
)?
;
647 let archive
= if active_only
{
650 let logrotate
= LogRotate
::new(&setup
.task_archive_fn
, true)
651 .ok_or_else(|| format_err
!("could not get archive file names"))?
;
652 Some(logrotate
.files())
655 let lock
= if active_only { None }
else { Some(read_lock) }
;
658 list
: active_list
.into(),
666 impl Iterator
for TaskListInfoIterator
{
667 type Item
= Result
<TaskListInfo
, Error
>;
669 fn next(&mut self) -> Option
<Self::Item
> {
671 if let Some(element
) = self.list
.pop_back() {
672 return Some(Ok(element
));
676 if let Some(mut archive
) = self.archive
.take() {
677 if let Some(file
) = archive
.next() {
678 let list
= match read_task_file(file
) {
680 Err(err
) => return Some(Err(err
)),
682 self.list
.append(&mut list
.into());
683 self.archive
= Some(archive
);
695 /// Launch long running worker tasks.
697 /// A worker task can either be a whole thread, or a simply tokio
698 /// task/future. Each task can `log()` messages, which are stored
699 /// persistently to files. Task should poll the `abort_requested`
700 /// flag, and stop execution when requested.
701 pub struct WorkerTask
{
702 setup
: &'
static WorkerTaskSetup
,
704 data
: Mutex
<WorkerTaskData
>,
705 abort_requested
: AtomicBool
,
708 impl std
::fmt
::Display
for WorkerTask
{
710 fn fmt(&self, f
: &mut std
::fmt
::Formatter
) -> std
::fmt
::Result
{
715 struct WorkerTaskData
{
717 progress
: f64, // 0..1
719 pub abort_listeners
: Vec
<oneshot
::Sender
<()>>,
726 worker_id
: Option
<String
>,
729 ) -> Result
<Arc
<Self>, Error
> {
731 let setup
= worker_task_setup()?
;
733 let upid
= UPID
::new(worker_type
, worker_id
, auth_id
)?
;
734 let task_id
= upid
.task_id
;
736 let mut path
= setup
.taskdir
.clone();
738 path
.push(format
!("{:02X}", upid
.pstart
& 255));
740 let dir_opts
= setup
.file_opts
.clone()
741 .perm(nix
::sys
::stat
::Mode
::from_bits_truncate(0o755));
743 create_path(&path
, None
, Some(dir_opts
))?
;
745 path
.push(upid
.to_string());
747 let logger_options
= FileLogOptions
{
752 file_opts
: setup
.file_opts
.clone(),
755 let logger
= FileLogger
::new(&path
, logger_options
)?
;
757 let worker
= Arc
::new(Self {
760 abort_requested
: AtomicBool
::new(false),
761 data
: Mutex
::new(WorkerTaskData
{
765 abort_listeners
: vec
![],
769 // scope to drop the lock again after inserting
771 let mut hash
= WORKER_TASK_LIST
.lock().unwrap();
772 hash
.insert(task_id
, worker
.clone());
773 crate::set_worker_count(hash
.len());
776 setup
.update_active_workers(Some(&upid
))?
;
781 /// Spawn a new tokio task/future.
784 worker_id
: Option
<String
>,
788 ) -> Result
<String
, Error
>
789 where F
: Send
+ '
static + FnOnce(Arc
<WorkerTask
>) -> T
,
790 T
: Send
+ '
static + Future
<Output
= Result
<(), Error
>>,
792 let worker
= WorkerTask
::new(worker_type
, worker_id
, auth_id
, to_stdout
)?
;
793 let upid_str
= worker
.upid
.to_string();
794 let f
= f(worker
.clone());
795 tokio
::spawn(async
move {
796 let result
= f
.await
;
797 worker
.log_result(&result
);
803 /// Create a new worker thread.
804 pub fn new_thread
<F
>(
806 worker_id
: Option
<String
>,
810 ) -> Result
<String
, Error
>
811 where F
: Send
+ UnwindSafe
+ '
static + FnOnce(Arc
<WorkerTask
>) -> Result
<(), Error
>
813 let worker
= WorkerTask
::new(worker_type
, worker_id
, auth_id
, to_stdout
)?
;
814 let upid_str
= worker
.upid
.to_string();
816 let _child
= std
::thread
::Builder
::new().name(upid_str
.clone()).spawn(move || {
817 let worker1
= worker
.clone();
818 let result
= match std
::panic
::catch_unwind(move || f(worker1
)) {
821 match panic
.downcast
::<&str>() {
823 Err(format_err
!("worker panicked: {}", panic_msg
))
826 Err(format_err
!("worker panicked: unknown type."))
832 worker
.log_result(&result
);
838 /// create state from self and a result
839 pub fn create_state(&self, result
: &Result
<(), Error
>) -> TaskState
{
840 let warn_count
= self.data
.lock().unwrap().warn_count
;
842 let endtime
= proxmox_time
::epoch_i64();
844 if let Err(err
) = result
{
845 TaskState
::Error { message: err.to_string(), endtime }
846 } else if warn_count
> 0 {
847 TaskState
::Warning { count: warn_count, endtime }
849 TaskState
::OK { endtime }
853 /// Log task result, remove task from running list
854 pub fn log_result(&self, result
: &Result
<(), Error
>) {
855 let state
= self.create_state(result
);
856 self.log_message(state
.result_text());
858 WORKER_TASK_LIST
.lock().unwrap().remove(&self.upid
.task_id
);
859 let _
= self.setup
.update_active_workers(None
);
860 crate::set_worker_count(WORKER_TASK_LIST
.lock().unwrap().len());
864 pub fn log_message
<S
: AsRef
<str>>(&self, msg
: S
) {
865 let mut data
= self.data
.lock().unwrap();
866 data
.logger
.log(msg
);
869 /// Log a message as warning.
870 pub fn log_warning
<S
: AsRef
<str>>(&self, msg
: S
) {
871 let mut data
= self.data
.lock().unwrap();
872 data
.logger
.log(format
!("WARN: {}", msg
.as_ref()));
873 data
.warn_count
+= 1;
876 /// Set progress indicator
877 pub fn progress(&self, progress
: f64) {
878 if progress
>= 0.0 && progress
<= 1.0 {
879 let mut data
= self.data
.lock().unwrap();
880 data
.progress
= progress
;
882 // fixme: log!("task '{}': ignoring strange value for progress '{}'", self.upid, progress);
887 pub fn request_abort(&self) {
888 let prev_abort
= self.abort_requested
.swap(true, Ordering
::SeqCst
);
889 if !prev_abort
{ // log abort one time
890 self.log_message(format
!("received abort request ..."));
893 let mut data
= self.data
.lock().unwrap();
895 match data
.abort_listeners
.pop() {
898 let _
= ch
.send(()); // ignore errors here
904 /// Get a future which resolves on task abort
905 pub fn abort_future(&self) -> oneshot
::Receiver
<()> {
906 let (tx
, rx
) = oneshot
::channel
::<()>();
908 let mut data
= self.data
.lock().unwrap();
909 if self.abort_requested() {
912 data
.abort_listeners
.push(tx
);
917 pub fn upid(&self) -> &UPID
{
922 impl WorkerTaskContext
for WorkerTask
{
924 fn abort_requested(&self) -> bool
{
925 self.abort_requested
.load(Ordering
::SeqCst
)
928 fn shutdown_requested(&self) -> bool
{
929 crate::shutdown_requested()
932 fn fail_on_shutdown(&self) -> Result
<(), Error
> {
933 crate::fail_on_shutdown()
936 fn log(&self, level
: log
::Level
, message
: &std
::fmt
::Arguments
) {
938 log
::Level
::Error
=> self.log_warning(&message
.to_string()),
939 log
::Level
::Warn
=> self.log_warning(&message
.to_string()),
940 log
::Level
::Info
=> self.log_message(&message
.to_string()),
941 log
::Level
::Debug
=> self.log_message(&format
!("DEBUG: {}", message
)),
942 log
::Level
::Trace
=> self.log_message(&format
!("TRACE: {}", message
)),
947 /// Wait for a locally spanned worker task
949 /// Note: local workers should print logs to stdout, so there is no
950 /// need to fetch/display logs. We just wait for the worker to finish.
951 pub async
fn wait_for_local_worker(upid_str
: &str) -> Result
<(), Error
> {
953 let upid
: UPID
= upid_str
.parse()?
;
955 let sleep_duration
= core
::time
::Duration
::new(0, 100_000_000);
958 if worker_is_active_local(&upid
) {
959 tokio
::time
::sleep(sleep_duration
).await
;
967 /// Request abort of a local worker (if existing and running)
968 pub fn abort_local_worker(upid
: UPID
) {
969 if let Some(ref worker
) = WORKER_TASK_LIST
.lock().unwrap().get(&upid
.task_id
) {
970 worker
.request_abort();