1 use std
::collections
::{HashMap, VecDeque}
;
3 use std
::io
::{Read, Write, BufRead, BufReader}
;
4 use std
::panic
::UnwindSafe
;
5 use std
::sync
::atomic
::{AtomicBool, Ordering}
;
6 use std
::sync
::{Arc, Mutex}
;
8 use anyhow
::{bail, format_err, Error}
;
10 use lazy_static
::lazy_static
;
11 use serde_json
::{json, Value}
;
12 use serde
::{Serialize, Deserialize}
;
13 use tokio
::sync
::oneshot
;
15 use proxmox
::sys
::linux
::procfs
;
16 use proxmox
::try_block
;
17 use proxmox
::tools
::fs
::{create_path, replace_file, CreateOptions}
;
20 use pbs_tools
::logrotate
::{LogRotate, LogRotateFiles}
;
22 use super::{UPID, UPIDExt}
;
25 use crate::tools
::{FileLogger, FileLogOptions}
;
26 use crate::api2
::types
::{Authid, TaskStateType}
;
27 use crate::backup
::{open_backup_lockfile, BackupLockGuard}
;
29 macro_rules
! taskdir
{
30 ($subdir
:expr
) => (concat
!(pbs_buildcfg
::PROXMOX_BACKUP_LOG_DIR_M
!(), "/tasks", $subdir
))
32 pub const PROXMOX_BACKUP_TASK_DIR
: &str = taskdir
!("/");
33 pub const PROXMOX_BACKUP_TASK_LOCK_FN
: &str = taskdir
!("/.active.lock");
34 pub const PROXMOX_BACKUP_ACTIVE_TASK_FN
: &str = taskdir
!("/active");
35 pub const PROXMOX_BACKUP_INDEX_TASK_FN
: &str = taskdir
!("/index");
36 pub const PROXMOX_BACKUP_ARCHIVE_TASK_FN
: &str = taskdir
!("/archive");
39 static ref WORKER_TASK_LIST
: Mutex
<HashMap
<usize, Arc
<WorkerTask
>>> = Mutex
::new(HashMap
::new());
42 /// checks if the task UPID refers to a worker from this process
43 fn is_local_worker(upid
: &UPID
) -> bool
{
44 upid
.pid
== server
::pid() && upid
.pstart
== server
::pstart()
47 /// Test if the task is still running
48 pub async
fn worker_is_active(upid
: &UPID
) -> Result
<bool
, Error
> {
49 if is_local_worker(upid
) {
50 return Ok(WORKER_TASK_LIST
.lock().unwrap().contains_key(&upid
.task_id
));
53 if procfs
::check_process_running_pstart(upid
.pid
, upid
.pstart
).is_none() {
57 let sock
= server
::ctrl_sock_from_pid(upid
.pid
);
59 "command": "worker-task-status",
61 "upid": upid
.to_string(),
64 let status
= super::send_command(sock
, &cmd
).await?
;
66 if let Some(active
) = status
.as_bool() {
69 bail
!("got unexpected result {:?} (expected bool)", status
);
73 /// Test if the task is still running (fast but inaccurate implementation)
75 /// If the task is spawned from a different process, we simply return if
76 /// that process is still running. This information is good enough to detect
78 pub fn worker_is_active_local(upid
: &UPID
) -> bool
{
79 if is_local_worker(upid
) {
80 WORKER_TASK_LIST
.lock().unwrap().contains_key(&upid
.task_id
)
82 procfs
::check_process_running_pstart(upid
.pid
, upid
.pstart
).is_some()
86 pub fn register_task_control_commands(
87 commando_sock
: &mut super::CommandoSocket
,
88 ) -> Result
<(), Error
> {
89 fn get_upid(args
: Option
<&Value
>) -> Result
<UPID
, Error
> {
90 let args
= if let Some(args
) = args { args }
else { bail!("missing args") }
;
91 let upid
= match args
.get("upid") {
92 Some(Value
::String(upid
)) => upid
.parse
::<UPID
>()?
,
93 None
=> bail
!("no upid in args"),
94 _
=> bail
!("unable to parse upid"),
96 if !is_local_worker(&upid
) {
97 bail
!("upid does not belong to this process");
102 commando_sock
.register_command("worker-task-abort".into(), move |args
| {
103 let upid
= get_upid(args
)?
;
105 if let Some(ref worker
) = WORKER_TASK_LIST
.lock().unwrap().get(&upid
.task_id
) {
106 worker
.request_abort();
110 commando_sock
.register_command("worker-task-status".into(), move |args
| {
111 let upid
= get_upid(args
)?
;
113 let active
= WORKER_TASK_LIST
.lock().unwrap().contains_key(&upid
.task_id
);
121 pub fn abort_worker_async(upid
: UPID
) {
122 tokio
::spawn(async
move {
123 if let Err(err
) = abort_worker(upid
).await
{
124 eprintln
!("abort worker failed - {}", err
);
129 pub async
fn abort_worker(upid
: UPID
) -> Result
<(), Error
> {
131 let sock
= server
::ctrl_sock_from_pid(upid
.pid
);
133 "command": "worker-task-abort",
135 "upid": upid
.to_string(),
138 super::send_command(sock
, &cmd
).map_ok(|_
| ()).await
141 fn parse_worker_status_line(line
: &str) -> Result
<(String
, UPID
, Option
<TaskState
>), Error
> {
143 let data
= line
.splitn(3, ' '
).collect
::<Vec
<&str>>();
145 let len
= data
.len();
148 1 => Ok((data
[0].to_owned(), data
[0].parse
::<UPID
>()?
, None
)),
150 let endtime
= i64::from_str_radix(data
[1], 16)?
;
151 let state
= TaskState
::from_endtime_and_message(endtime
, data
[2])?
;
152 Ok((data
[0].to_owned(), data
[0].parse
::<UPID
>()?
, Some(state
)))
154 _
=> bail
!("wrong number of components"),
158 /// Create task log directory with correct permissions
159 pub fn create_task_log_dirs() -> Result
<(), Error
> {
162 let backup_user
= crate::backup
::backup_user()?
;
163 let opts
= CreateOptions
::new()
164 .owner(backup_user
.uid
)
165 .group(backup_user
.gid
);
167 create_path(pbs_buildcfg
::PROXMOX_BACKUP_LOG_DIR
, None
, Some(opts
.clone()))?
;
168 create_path(PROXMOX_BACKUP_TASK_DIR
, None
, Some(opts
.clone()))?
;
169 create_path(pbs_buildcfg
::PROXMOX_BACKUP_RUN_DIR
, None
, Some(opts
))?
;
171 }).map_err(|err
: Error
| format_err
!("unable to create task log dir - {}", err
))?
;
176 /// Read endtime (time of last log line) and exitstatus from task log file
177 /// If there is not a single line with at valid datetime, we assume the
178 /// starttime to be the endtime
179 pub fn upid_read_status(upid
: &UPID
) -> Result
<TaskState
, Error
> {
181 let mut status
= TaskState
::Unknown { endtime: upid.starttime }
;
183 let path
= upid
.log_path();
185 let mut file
= File
::open(path
)?
;
187 /// speedup - only read tail
189 use std
::io
::SeekFrom
;
190 let _
= file
.seek(SeekFrom
::End(-8192)); // ignore errors
192 let mut data
= Vec
::with_capacity(8192);
193 file
.read_to_end(&mut data
)?
;
195 // strip newlines at the end of the task logs
196 while data
.last() == Some(&b'
\n'
) {
200 let last_line
= match data
.iter().rposition(|c
| *c
== b'
\n'
) {
201 Some(start
) if data
.len() > (start
+1) => &data
[start
+1..],
202 Some(_
) => &data
, // should not happen, since we removed all trailing newlines
206 let last_line
= std
::str::from_utf8(last_line
)
207 .map_err(|err
| format_err
!("upid_read_status: utf8 parse failed: {}", err
))?
;
209 let mut iter
= last_line
.splitn(2, ": ");
210 if let Some(time_str
) = iter
.next() {
211 if let Ok(endtime
) = proxmox
::tools
::time
::parse_rfc3339(time_str
) {
212 // set the endtime even if we cannot parse the state
213 status
= TaskState
::Unknown { endtime }
;
214 if let Some(rest
) = iter
.next().and_then(|rest
| rest
.strip_prefix("TASK ")) {
215 if let Ok(state
) = TaskState
::from_endtime_and_message(endtime
, rest
) {
226 #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
228 /// The Task ended with an undefined state
229 Unknown { endtime: i64 }
,
230 /// The Task ended and there were no errors or warnings
232 /// The Task had 'count' amount of warnings and no errors
233 Warning { count: u64, endtime: i64 }
,
234 /// The Task ended with the error described in 'message'
235 Error { message: String, endtime: i64 }
,
239 pub fn endtime(&self) -> i64 {
241 TaskState
::Unknown { endtime }
=> endtime
,
242 TaskState
::OK { endtime }
=> endtime
,
243 TaskState
::Warning { endtime, .. }
=> endtime
,
244 TaskState
::Error { endtime, .. }
=> endtime
,
248 pub fn tasktype(&self) -> TaskStateType
{
250 TaskState
::OK { .. }
=> TaskStateType
::OK
,
251 TaskState
::Unknown { .. }
=> TaskStateType
::Unknown
,
252 TaskState
::Error { .. }
=> TaskStateType
::Error
,
253 TaskState
::Warning { .. }
=> TaskStateType
::Warning
,
257 fn result_text(&self) -> String
{
259 TaskState
::Error { message, .. }
=> format
!("TASK ERROR: {}", message
),
260 other
=> format
!("TASK {}", other
),
264 fn from_endtime_and_message(endtime
: i64, s
: &str) -> Result
<Self, Error
> {
266 Ok(TaskState
::Unknown { endtime }
)
267 } else if s
== "OK" {
268 Ok(TaskState
::OK { endtime }
)
269 } else if let Some(warnings
) = s
.strip_prefix("WARNINGS: ") {
270 let count
: u64 = warnings
.parse()?
;
271 Ok(TaskState
::Warning{ count, endtime }
)
272 } else if !s
.is_empty() {
273 let message
= if let Some(err
) = s
.strip_prefix("ERROR: ") { err }
else { s }
.to_string();
274 Ok(TaskState
::Error{ message, endtime }
)
276 bail
!("unable to parse Task Status '{}'", s
);
281 impl std
::cmp
::PartialOrd
for TaskState
{
282 fn partial_cmp(&self, other
: &Self) -> Option
<std
::cmp
::Ordering
> {
283 Some(self.endtime().cmp(&other
.endtime()))
287 impl std
::cmp
::Ord
for TaskState
{
288 fn cmp(&self, other
: &Self) -> std
::cmp
::Ordering
{
289 self.endtime().cmp(&other
.endtime())
293 impl std
::fmt
::Display
for TaskState
{
294 fn fmt(&self, f
: &mut std
::fmt
::Formatter
<'_
>) -> std
::fmt
::Result
{
296 TaskState
::Unknown { .. }
=> write
!(f
, "unknown"),
297 TaskState
::OK { .. }
=> write
!(f
, "OK"),
298 TaskState
::Warning { count, .. }
=> write
!(f
, "WARNINGS: {}", count
),
299 TaskState
::Error { message, .. }
=> write
!(f
, "{}", message
),
304 /// Task details including parsed UPID
306 /// If there is no `state`, the task is still running.
308 pub struct TaskListInfo
{
311 /// UPID string representation
312 pub upid_str
: String
,
313 /// Task `(endtime, status)` if already finished
314 pub state
: Option
<TaskState
>, // endtime, status
317 impl Into
<pbs_api_types
::TaskListItem
> for TaskListInfo
{
318 fn into(self) -> pbs_api_types
::TaskListItem
{
319 let (endtime
, status
) = self
321 .map_or_else(|| (None
, None
), |a
| (Some(a
.endtime()), Some(a
.to_string())));
323 pbs_api_types
::TaskListItem
{
325 node
: "localhost".to_string(),
326 pid
: self.upid
.pid
as i64,
327 pstart
: self.upid
.pstart
,
328 starttime
: self.upid
.starttime
,
329 worker_type
: self.upid
.worker_type
,
330 worker_id
: self.upid
.worker_id
,
331 user
: self.upid
.auth_id
,
338 fn lock_task_list_files(exclusive
: bool
) -> Result
<BackupLockGuard
, Error
> {
339 open_backup_lockfile(PROXMOX_BACKUP_TASK_LOCK_FN
, None
, exclusive
)
342 /// checks if the Task Archive is bigger that 'size_threshold' bytes, and
343 /// rotates it if it is
344 pub fn rotate_task_log_archive(size_threshold
: u64, compress
: bool
, max_files
: Option
<usize>) -> Result
<bool
, Error
> {
345 let _lock
= lock_task_list_files(true)?
;
347 let mut logrotate
= LogRotate
::new(PROXMOX_BACKUP_ARCHIVE_TASK_FN
, compress
)
348 .ok_or_else(|| format_err
!("could not get archive file names"))?
;
350 logrotate
.rotate(size_threshold
, None
, max_files
)
353 // atomically read/update the task list, update status of finished tasks
354 // new_upid is added to the list when specified.
355 fn update_active_workers(new_upid
: Option
<&UPID
>) -> Result
<(), Error
> {
357 let backup_user
= crate::backup
::backup_user()?
;
359 let lock
= lock_task_list_files(true)?
;
361 // TODO remove with 1.x
362 let mut finish_list
: Vec
<TaskListInfo
> = read_task_file_from_path(PROXMOX_BACKUP_INDEX_TASK_FN
)?
;
363 let had_index_file
= !finish_list
.is_empty();
365 // We use filter_map because one negative case wants to *move* the data into `finish_list`,
366 // clippy doesn't quite catch this!
367 #[allow(clippy::unnecessary_filter_map)]
368 let mut active_list
: Vec
<TaskListInfo
> = read_task_file_from_path(PROXMOX_BACKUP_ACTIVE_TASK_FN
)?
371 if info
.state
.is_some() {
372 // this can happen when the active file still includes finished tasks
373 finish_list
.push(info
);
377 if !worker_is_active_local(&info
.upid
) {
378 // println!("Detected stopped task '{}'", &info.upid_str);
379 let now
= proxmox
::tools
::time
::epoch_i64();
380 let status
= upid_read_status(&info
.upid
).unwrap_or(TaskState
::Unknown { endtime: now }
);
381 finish_list
.push(TaskListInfo
{
383 upid_str
: info
.upid_str
,
392 if let Some(upid
) = new_upid
{
393 active_list
.push(TaskListInfo { upid: upid.clone(), upid_str: upid.to_string(), state: None }
);
396 let active_raw
= render_task_list(&active_list
);
399 PROXMOX_BACKUP_ACTIVE_TASK_FN
,
400 active_raw
.as_bytes(),
402 .owner(backup_user
.uid
)
403 .group(backup_user
.gid
),
406 finish_list
.sort_unstable_by(|a
, b
| {
407 match (&a
.state
, &b
.state
) {
408 (Some(s1
), Some(s2
)) => s1
.cmp(&s2
),
409 (Some(_
), None
) => std
::cmp
::Ordering
::Less
,
410 (None
, Some(_
)) => std
::cmp
::Ordering
::Greater
,
411 _
=> a
.upid
.starttime
.cmp(&b
.upid
.starttime
),
415 if !finish_list
.is_empty() {
416 match std
::fs
::OpenOptions
::new().append(true).create(true).open(PROXMOX_BACKUP_ARCHIVE_TASK_FN
) {
418 for info
in &finish_list
{
419 writer
.write_all(render_task_line(&info
).as_bytes())?
;
422 Err(err
) => bail
!("could not write task archive - {}", err
),
425 nix
::unistd
::chown(PROXMOX_BACKUP_ARCHIVE_TASK_FN
, Some(backup_user
.uid
), Some(backup_user
.gid
))?
;
428 // TODO Remove with 1.x
429 // for compatibility, if we had an INDEX file, we do not need it anymore
431 let _
= nix
::unistd
::unlink(PROXMOX_BACKUP_INDEX_TASK_FN
);
439 fn render_task_line(info
: &TaskListInfo
) -> String
{
440 let mut raw
= String
::new();
441 if let Some(status
) = &info
.state
{
442 raw
.push_str(&format
!("{} {:08X} {}\n", info
.upid_str
, status
.endtime(), status
));
444 raw
.push_str(&info
.upid_str
);
451 fn render_task_list(list
: &[TaskListInfo
]) -> String
{
452 let mut raw
= String
::new();
454 raw
.push_str(&render_task_line(&info
));
459 // note this is not locked, caller has to make sure it is
460 // this will skip (and log) lines that are not valid status lines
461 fn read_task_file
<R
: Read
>(reader
: R
) -> Result
<Vec
<TaskListInfo
>, Error
>
463 let reader
= BufReader
::new(reader
);
464 let mut list
= Vec
::new();
465 for line
in reader
.lines() {
467 match parse_worker_status_line(&line
) {
468 Ok((upid_str
, upid
, state
)) => list
.push(TaskListInfo
{
474 eprintln
!("unable to parse worker status '{}' - {}", line
, err
);
483 // note this is not locked, caller has to make sure it is
484 fn read_task_file_from_path
<P
>(path
: P
) -> Result
<Vec
<TaskListInfo
>, Error
>
486 P
: AsRef
<std
::path
::Path
> + std
::fmt
::Debug
,
488 let file
= match File
::open(&path
) {
490 Err(err
) if err
.kind() == std
::io
::ErrorKind
::NotFound
=> return Ok(Vec
::new()),
491 Err(err
) => bail
!("unable to open task list {:?} - {}", path
, err
),
497 pub struct TaskListInfoIterator
{
498 list
: VecDeque
<TaskListInfo
>,
500 archive
: Option
<LogRotateFiles
>,
501 lock
: Option
<BackupLockGuard
>,
504 impl TaskListInfoIterator
{
505 pub fn new(active_only
: bool
) -> Result
<Self, Error
> {
506 let (read_lock
, active_list
) = {
507 let lock
= lock_task_list_files(false)?
;
508 let active_list
= read_task_file_from_path(PROXMOX_BACKUP_ACTIVE_TASK_FN
)?
;
510 let needs_update
= active_list
512 .any(|info
| info
.state
.is_some() || !worker_is_active_local(&info
.upid
));
514 // TODO remove with 1.x
515 let index_exists
= std
::path
::Path
::new(PROXMOX_BACKUP_INDEX_TASK_FN
).is_file();
517 if needs_update
|| index_exists
{
519 update_active_workers(None
)?
;
520 let lock
= lock_task_list_files(false)?
;
521 let active_list
= read_task_file_from_path(PROXMOX_BACKUP_ACTIVE_TASK_FN
)?
;
528 let archive
= if active_only
{
531 let logrotate
= LogRotate
::new(PROXMOX_BACKUP_ARCHIVE_TASK_FN
, true)
532 .ok_or_else(|| format_err
!("could not get archive file names"))?
;
533 Some(logrotate
.files())
536 let lock
= if active_only { None }
else { Some(read_lock) }
;
539 list
: active_list
.into(),
547 impl Iterator
for TaskListInfoIterator
{
548 type Item
= Result
<TaskListInfo
, Error
>;
550 fn next(&mut self) -> Option
<Self::Item
> {
552 if let Some(element
) = self.list
.pop_back() {
553 return Some(Ok(element
));
557 if let Some(mut archive
) = self.archive
.take() {
558 if let Some(file
) = archive
.next() {
559 let list
= match read_task_file(file
) {
561 Err(err
) => return Some(Err(err
)),
563 self.list
.append(&mut list
.into());
564 self.archive
= Some(archive
);
576 /// Launch long running worker tasks.
578 /// A worker task can either be a whole thread, or a simply tokio
579 /// task/future. Each task can `log()` messages, which are stored
580 /// persistently to files. Task should poll the `abort_requested`
581 /// flag, and stop execution when requested.
583 pub struct WorkerTask
{
585 data
: Mutex
<WorkerTaskData
>,
586 abort_requested
: AtomicBool
,
589 impl std
::fmt
::Display
for WorkerTask
{
591 fn fmt(&self, f
: &mut std
::fmt
::Formatter
) -> std
::fmt
::Result
{
597 struct WorkerTaskData
{
599 progress
: f64, // 0..1
601 pub abort_listeners
: Vec
<oneshot
::Sender
<()>>,
606 pub fn new(worker_type
: &str, worker_id
: Option
<String
>, auth_id
: Authid
, to_stdout
: bool
) -> Result
<Arc
<Self>, Error
> {
607 let upid
= UPID
::new(worker_type
, worker_id
, auth_id
)?
;
608 let task_id
= upid
.task_id
;
610 let mut path
= std
::path
::PathBuf
::from(PROXMOX_BACKUP_TASK_DIR
);
612 path
.push(format
!("{:02X}", upid
.pstart
& 255));
614 let backup_user
= crate::backup
::backup_user()?
;
616 create_path(&path
, None
, Some(CreateOptions
::new().owner(backup_user
.uid
).group(backup_user
.gid
)))?
;
618 path
.push(upid
.to_string());
620 let logger_options
= FileLogOptions
{
627 let logger
= FileLogger
::new(&path
, logger_options
)?
;
628 nix
::unistd
::chown(&path
, Some(backup_user
.uid
), Some(backup_user
.gid
))?
;
630 let worker
= Arc
::new(Self {
632 abort_requested
: AtomicBool
::new(false),
633 data
: Mutex
::new(WorkerTaskData
{
637 abort_listeners
: vec
![],
641 // scope to drop the lock again after inserting
643 let mut hash
= WORKER_TASK_LIST
.lock().unwrap();
644 hash
.insert(task_id
, worker
.clone());
645 super::set_worker_count(hash
.len());
648 update_active_workers(Some(&upid
))?
;
653 /// Spawn a new tokio task/future.
656 worker_id
: Option
<String
>,
660 ) -> Result
<String
, Error
>
661 where F
: Send
+ '
static + FnOnce(Arc
<WorkerTask
>) -> T
,
662 T
: Send
+ '
static + Future
<Output
= Result
<(), Error
>>,
664 let worker
= WorkerTask
::new(worker_type
, worker_id
, auth_id
, to_stdout
)?
;
665 let upid_str
= worker
.upid
.to_string();
666 let f
= f(worker
.clone());
667 tokio
::spawn(async
move {
668 let result
= f
.await
;
669 worker
.log_result(&result
);
675 /// Create a new worker thread.
676 pub fn new_thread
<F
>(
678 worker_id
: Option
<String
>,
682 ) -> Result
<String
, Error
>
683 where F
: Send
+ UnwindSafe
+ '
static + FnOnce(Arc
<WorkerTask
>) -> Result
<(), Error
>
685 let worker
= WorkerTask
::new(worker_type
, worker_id
, auth_id
, to_stdout
)?
;
686 let upid_str
= worker
.upid
.to_string();
688 let _child
= std
::thread
::Builder
::new().name(upid_str
.clone()).spawn(move || {
689 let worker1
= worker
.clone();
690 let result
= match std
::panic
::catch_unwind(move || f(worker1
)) {
693 match panic
.downcast
::<&str>() {
695 Err(format_err
!("worker panicked: {}", panic_msg
))
698 Err(format_err
!("worker panicked: unknown type."))
704 worker
.log_result(&result
);
710 /// create state from self and a result
711 pub fn create_state(&self, result
: &Result
<(), Error
>) -> TaskState
{
712 let warn_count
= self.data
.lock().unwrap().warn_count
;
714 let endtime
= proxmox
::tools
::time
::epoch_i64();
716 if let Err(err
) = result
{
717 TaskState
::Error { message: err.to_string(), endtime }
718 } else if warn_count
> 0 {
719 TaskState
::Warning { count: warn_count, endtime }
721 TaskState
::OK { endtime }
725 /// Log task result, remove task from running list
726 pub fn log_result(&self, result
: &Result
<(), Error
>) {
727 let state
= self.create_state(result
);
728 self.log(state
.result_text());
730 WORKER_TASK_LIST
.lock().unwrap().remove(&self.upid
.task_id
);
731 let _
= update_active_workers(None
);
732 super::set_worker_count(WORKER_TASK_LIST
.lock().unwrap().len());
736 pub fn log
<S
: AsRef
<str>>(&self, msg
: S
) {
737 let mut data
= self.data
.lock().unwrap();
738 data
.logger
.log(msg
);
741 /// Log a message as warning.
742 pub fn warn
<S
: AsRef
<str>>(&self, msg
: S
) {
743 let mut data
= self.data
.lock().unwrap();
744 data
.logger
.log(format
!("WARN: {}", msg
.as_ref()));
745 data
.warn_count
+= 1;
748 /// Set progress indicator
749 pub fn progress(&self, progress
: f64) {
750 if progress
>= 0.0 && progress
<= 1.0 {
751 let mut data
= self.data
.lock().unwrap();
752 data
.progress
= progress
;
754 // fixme: log!("task '{}': ignoring strange value for progress '{}'", self.upid, progress);
759 pub fn request_abort(&self) {
760 eprintln
!("set abort flag for worker {}", self.upid
);
762 let prev_abort
= self.abort_requested
.swap(true, Ordering
::SeqCst
);
763 if !prev_abort
{ // log abort one time
764 self.log(format
!("received abort request ..."));
767 let mut data
= self.data
.lock().unwrap();
769 match data
.abort_listeners
.pop() {
772 let _
= ch
.send(()); // ignore errors here
778 /// Test if abort was requested.
779 pub fn abort_requested(&self) -> bool
{
780 self.abort_requested
.load(Ordering
::SeqCst
)
783 /// Fail if abort was requested.
784 pub fn fail_on_abort(&self) -> Result
<(), Error
> {
785 if self.abort_requested() {
786 bail
!("abort requested - aborting task");
791 /// Get a future which resolves on task abort
792 pub fn abort_future(&self) -> oneshot
::Receiver
<()> {
793 let (tx
, rx
) = oneshot
::channel
::<()>();
795 let mut data
= self.data
.lock().unwrap();
796 if self.abort_requested() {
799 data
.abort_listeners
.push(tx
);
804 pub fn upid(&self) -> &UPID
{
809 impl pbs_datastore
::task
::TaskState
for WorkerTask
{
810 fn check_abort(&self) -> Result
<(), Error
> {
814 fn log(&self, level
: log
::Level
, message
: &std
::fmt
::Arguments
) {
816 log
::Level
::Error
=> self.warn(&message
.to_string()),
817 log
::Level
::Warn
=> self.warn(&message
.to_string()),
818 log
::Level
::Info
=> self.log(&message
.to_string()),
819 log
::Level
::Debug
=> self.log(&format
!("DEBUG: {}", message
)),
820 log
::Level
::Trace
=> self.log(&format
!("TRACE: {}", message
)),
825 /// Wait for a locally spanned worker task
827 /// Note: local workers should print logs to stdout, so there is no
828 /// need to fetch/display logs. We just wait for the worker to finish.
829 pub async
fn wait_for_local_worker(upid_str
: &str) -> Result
<(), Error
> {
831 let upid
: UPID
= upid_str
.parse()?
;
833 let sleep_duration
= core
::time
::Duration
::new(0, 100_000_000);
836 if worker_is_active_local(&upid
) {
837 tokio
::time
::sleep(sleep_duration
).await
;