]> git.proxmox.com Git - proxmox-backup.git/blob - proxmox-rest-server/src/worker_task.rs
move worker_task.rs into proxmox-rest-server crate
[proxmox-backup.git] / proxmox-rest-server / src / worker_task.rs
1 use std::collections::{HashMap, VecDeque};
2 use std::fs::File;
3 use std::path::PathBuf;
4 use std::io::{Read, Write, BufRead, BufReader};
5 use std::panic::UnwindSafe;
6 use std::sync::atomic::{AtomicBool, Ordering};
7 use std::sync::{Arc, Mutex};
8
9 use anyhow::{bail, format_err, Error};
10 use futures::*;
11 use lazy_static::lazy_static;
12 use serde_json::{json, Value};
13 use serde::{Serialize, Deserialize};
14 use tokio::sync::oneshot;
15 use nix::fcntl::OFlag;
16 use once_cell::sync::OnceCell;
17
18 use proxmox::sys::linux::procfs;
19 use proxmox::try_block;
20 use proxmox::tools::fs::{create_path, replace_file, atomic_open_or_create_file, CreateOptions};
21 use proxmox::api::upid::UPID;
22
23 use pbs_tools::logrotate::{LogRotate, LogRotateFiles};
24
25 use crate::{CommandoSocket, FileLogger, FileLogOptions};
26
27 struct TaskListLockGuard(File);
28
29 struct WorkerTaskSetup {
30 file_opts: CreateOptions,
31 taskdir: PathBuf,
32 task_lock_fn: PathBuf,
33 active_tasks_fn: PathBuf,
34 task_index_fn: PathBuf,
35 task_archive_fn: PathBuf,
36 }
37
38 static WORKER_TASK_SETUP: OnceCell<WorkerTaskSetup> = OnceCell::new();
39
40 fn worker_task_setup() -> Result<&'static WorkerTaskSetup, Error> {
41 WORKER_TASK_SETUP.get()
42 .ok_or_else(|| format_err!("WorkerTask library is not initialized"))
43 }
44
45 impl WorkerTaskSetup {
46
47 fn new(basedir: PathBuf, file_opts: CreateOptions) -> Self {
48
49 let mut taskdir = basedir.clone();
50 taskdir.push("tasks");
51
52 let mut task_lock_fn = taskdir.clone();
53 task_lock_fn.push(".active.lock");
54
55 let mut active_tasks_fn = taskdir.clone();
56 active_tasks_fn.push("active");
57
58 let mut task_index_fn = taskdir.clone();
59 task_index_fn.push("index");
60
61 let mut task_archive_fn = taskdir.clone();
62 task_archive_fn.push("archive");
63
64 Self {
65 file_opts,
66 taskdir,
67 task_lock_fn,
68 active_tasks_fn,
69 task_index_fn,
70 task_archive_fn,
71 }
72 }
73
74 fn lock_task_list_files(&self, exclusive: bool) -> Result<TaskListLockGuard, Error> {
75 let options = self.file_opts.clone()
76 .perm(nix::sys::stat::Mode::from_bits_truncate(0o660));
77
78 let timeout = std::time::Duration::new(10, 0);
79
80 let file = proxmox::tools::fs::open_file_locked(
81 &self.task_lock_fn,
82 timeout,
83 exclusive,
84 options,
85 )?;
86
87 Ok(TaskListLockGuard(file))
88 }
89
90 fn log_path(&self, upid: &UPID) -> std::path::PathBuf {
91 let mut path = self.taskdir.clone();
92 path.push(format!("{:02X}", upid.pstart % 256));
93 path.push(upid.to_string());
94 path
95 }
96
97 // atomically read/update the task list, update status of finished tasks
98 // new_upid is added to the list when specified.
99 fn update_active_workers(&self, new_upid: Option<&UPID>) -> Result<(), Error> {
100
101 let lock = self.lock_task_list_files(true)?;
102
103 // TODO remove with 1.x
104 let mut finish_list: Vec<TaskListInfo> = read_task_file_from_path(&self.task_index_fn)?;
105 let had_index_file = !finish_list.is_empty();
106
107 // We use filter_map because one negative case wants to *move* the data into `finish_list`,
108 // clippy doesn't quite catch this!
109 #[allow(clippy::unnecessary_filter_map)]
110 let mut active_list: Vec<TaskListInfo> = read_task_file_from_path(&self.active_tasks_fn)?
111 .into_iter()
112 .filter_map(|info| {
113 if info.state.is_some() {
114 // this can happen when the active file still includes finished tasks
115 finish_list.push(info);
116 return None;
117 }
118
119 if !worker_is_active_local(&info.upid) {
120 // println!("Detected stopped task '{}'", &info.upid_str);
121 let now = proxmox::tools::time::epoch_i64();
122 let status = upid_read_status(&info.upid).unwrap_or(TaskState::Unknown { endtime: now });
123 finish_list.push(TaskListInfo {
124 upid: info.upid,
125 upid_str: info.upid_str,
126 state: Some(status)
127 });
128 return None;
129 }
130
131 Some(info)
132 }).collect();
133
134 if let Some(upid) = new_upid {
135 active_list.push(TaskListInfo { upid: upid.clone(), upid_str: upid.to_string(), state: None });
136 }
137
138 let active_raw = render_task_list(&active_list);
139
140 let options = self.file_opts.clone()
141 .perm(nix::sys::stat::Mode::from_bits_truncate(0o660));
142
143 replace_file(
144 &self.active_tasks_fn,
145 active_raw.as_bytes(),
146 options,
147 )?;
148
149 finish_list.sort_unstable_by(|a, b| {
150 match (&a.state, &b.state) {
151 (Some(s1), Some(s2)) => s1.cmp(&s2),
152 (Some(_), None) => std::cmp::Ordering::Less,
153 (None, Some(_)) => std::cmp::Ordering::Greater,
154 _ => a.upid.starttime.cmp(&b.upid.starttime),
155 }
156 });
157
158 if !finish_list.is_empty() {
159 let options = self.file_opts.clone()
160 .perm(nix::sys::stat::Mode::from_bits_truncate(0o660));
161
162 let mut writer = atomic_open_or_create_file(
163 &self.task_archive_fn,
164 OFlag::O_APPEND | OFlag::O_RDWR,
165 &[],
166 options,
167 )?;
168 for info in &finish_list {
169 writer.write_all(render_task_line(&info).as_bytes())?;
170 }
171 }
172
173 // TODO Remove with 1.x
174 // for compatibility, if we had an INDEX file, we do not need it anymore
175 if had_index_file {
176 let _ = nix::unistd::unlink(&self.task_index_fn);
177 }
178
179 drop(lock);
180
181 Ok(())
182 }
183
184 // Create task log directory with correct permissions
185 fn create_task_log_dirs(&self) -> Result<(), Error> {
186
187 try_block!({
188 let dir_opts = self.file_opts.clone()
189 .perm(nix::sys::stat::Mode::from_bits_truncate(0o755));
190
191 create_path(&self.taskdir, Some(dir_opts.clone()), Some(dir_opts.clone()))?;
192 // fixme:??? create_path(pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR, None, Some(opts))?;
193 Ok(())
194 }).map_err(|err: Error| format_err!("unable to create task log dir - {}", err))
195 }
196 }
197
198 /// Initialize the WorkerTask library
199 pub fn init_worker_tasks(basedir: PathBuf, file_opts: CreateOptions) -> Result<(), Error> {
200 let setup = WorkerTaskSetup::new(basedir, file_opts);
201 setup.create_task_log_dirs()?;
202 WORKER_TASK_SETUP.set(setup)
203 .map_err(|_| format_err!("init_worker_tasks failed - already initialized"))
204 }
205
206 /// checks if the Task Archive is bigger that 'size_threshold' bytes, and
207 /// rotates it if it is
208 pub fn rotate_task_log_archive(size_threshold: u64, compress: bool, max_files: Option<usize>) -> Result<bool, Error> {
209
210 let setup = worker_task_setup()?;
211
212 let _lock = setup.lock_task_list_files(true)?;
213
214 let mut logrotate = LogRotate::new(&setup.task_archive_fn, compress)
215 .ok_or_else(|| format_err!("could not get archive file names"))?;
216
217 logrotate.rotate(size_threshold, None, max_files)
218 }
219
220
221 /// Path to the worker log file
222 pub fn upid_log_path(upid: &UPID) -> Result<std::path::PathBuf, Error> {
223 let setup = worker_task_setup()?;
224 Ok(setup.log_path(upid))
225 }
226
227 /// Read endtime (time of last log line) and exitstatus from task log file
228 /// If there is not a single line with at valid datetime, we assume the
229 /// starttime to be the endtime
230 pub fn upid_read_status(upid: &UPID) -> Result<TaskState, Error> {
231
232 let setup = worker_task_setup()?;
233
234 let mut status = TaskState::Unknown { endtime: upid.starttime };
235
236 let path = setup.log_path(upid);
237
238 let mut file = File::open(path)?;
239
240 /// speedup - only read tail
241 use std::io::Seek;
242 use std::io::SeekFrom;
243 let _ = file.seek(SeekFrom::End(-8192)); // ignore errors
244
245 let mut data = Vec::with_capacity(8192);
246 file.read_to_end(&mut data)?;
247
248 // strip newlines at the end of the task logs
249 while data.last() == Some(&b'\n') {
250 data.pop();
251 }
252
253 let last_line = match data.iter().rposition(|c| *c == b'\n') {
254 Some(start) if data.len() > (start+1) => &data[start+1..],
255 Some(_) => &data, // should not happen, since we removed all trailing newlines
256 None => &data,
257 };
258
259 let last_line = std::str::from_utf8(last_line)
260 .map_err(|err| format_err!("upid_read_status: utf8 parse failed: {}", err))?;
261
262 let mut iter = last_line.splitn(2, ": ");
263 if let Some(time_str) = iter.next() {
264 if let Ok(endtime) = proxmox::tools::time::parse_rfc3339(time_str) {
265 // set the endtime even if we cannot parse the state
266 status = TaskState::Unknown { endtime };
267 if let Some(rest) = iter.next().and_then(|rest| rest.strip_prefix("TASK ")) {
268 if let Ok(state) = TaskState::from_endtime_and_message(endtime, rest) {
269 status = state;
270 }
271 }
272 }
273 }
274
275 Ok(status)
276 }
277
278 lazy_static! {
279 static ref WORKER_TASK_LIST: Mutex<HashMap<usize, Arc<WorkerTask>>> = Mutex::new(HashMap::new());
280 }
281
282 /// checks if the task UPID refers to a worker from this process
283 fn is_local_worker(upid: &UPID) -> bool {
284 upid.pid == crate::pid() && upid.pstart == crate::pstart()
285 }
286
287 /// Test if the task is still running
288 pub async fn worker_is_active(upid: &UPID) -> Result<bool, Error> {
289 if is_local_worker(upid) {
290 return Ok(WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id));
291 }
292
293 if procfs::check_process_running_pstart(upid.pid, upid.pstart).is_none() {
294 return Ok(false);
295 }
296
297 let sock = crate::ctrl_sock_from_pid(upid.pid);
298 let cmd = json!({
299 "command": "worker-task-status",
300 "args": {
301 "upid": upid.to_string(),
302 },
303 });
304 let status = crate::send_command(sock, &cmd).await?;
305
306 if let Some(active) = status.as_bool() {
307 Ok(active)
308 } else {
309 bail!("got unexpected result {:?} (expected bool)", status);
310 }
311 }
312
313 /// Test if the task is still running (fast but inaccurate implementation)
314 ///
315 /// If the task is spawned from a different process, we simply return if
316 /// that process is still running. This information is good enough to detect
317 /// stale tasks...
318 pub fn worker_is_active_local(upid: &UPID) -> bool {
319 if is_local_worker(upid) {
320 WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id)
321 } else {
322 procfs::check_process_running_pstart(upid.pid, upid.pstart).is_some()
323 }
324 }
325
326 pub fn register_task_control_commands(
327 commando_sock: &mut CommandoSocket,
328 ) -> Result<(), Error> {
329 fn get_upid(args: Option<&Value>) -> Result<UPID, Error> {
330 let args = if let Some(args) = args { args } else { bail!("missing args") };
331 let upid = match args.get("upid") {
332 Some(Value::String(upid)) => upid.parse::<UPID>()?,
333 None => bail!("no upid in args"),
334 _ => bail!("unable to parse upid"),
335 };
336 if !is_local_worker(&upid) {
337 bail!("upid does not belong to this process");
338 }
339 Ok(upid)
340 }
341
342 commando_sock.register_command("worker-task-abort".into(), move |args| {
343 let upid = get_upid(args)?;
344
345 abort_local_worker(upid);
346
347 Ok(Value::Null)
348 })?;
349 commando_sock.register_command("worker-task-status".into(), move |args| {
350 let upid = get_upid(args)?;
351
352 let active = WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id);
353
354 Ok(active.into())
355 })?;
356
357 Ok(())
358 }
359
360 pub fn abort_worker_async(upid: UPID) {
361 tokio::spawn(async move {
362 if let Err(err) = abort_worker(upid).await {
363 eprintln!("abort worker failed - {}", err);
364 }
365 });
366 }
367
368 pub async fn abort_worker(upid: UPID) -> Result<(), Error> {
369
370 let sock = crate::ctrl_sock_from_pid(upid.pid);
371 let cmd = json!({
372 "command": "worker-task-abort",
373 "args": {
374 "upid": upid.to_string(),
375 },
376 });
377 crate::send_command(sock, &cmd).map_ok(|_| ()).await
378 }
379
380 fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<TaskState>), Error> {
381
382 let data = line.splitn(3, ' ').collect::<Vec<&str>>();
383
384 let len = data.len();
385
386 match len {
387 1 => Ok((data[0].to_owned(), data[0].parse::<UPID>()?, None)),
388 3 => {
389 let endtime = i64::from_str_radix(data[1], 16)?;
390 let state = TaskState::from_endtime_and_message(endtime, data[2])?;
391 Ok((data[0].to_owned(), data[0].parse::<UPID>()?, Some(state)))
392 }
393 _ => bail!("wrong number of components"),
394 }
395 }
396
397 /// Task State
398 #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
399 pub enum TaskState {
400 /// The Task ended with an undefined state
401 Unknown { endtime: i64 },
402 /// The Task ended and there were no errors or warnings
403 OK { endtime: i64 },
404 /// The Task had 'count' amount of warnings and no errors
405 Warning { count: u64, endtime: i64 },
406 /// The Task ended with the error described in 'message'
407 Error { message: String, endtime: i64 },
408 }
409
410 impl TaskState {
411 pub fn endtime(&self) -> i64 {
412 match *self {
413 TaskState::Unknown { endtime } => endtime,
414 TaskState::OK { endtime } => endtime,
415 TaskState::Warning { endtime, .. } => endtime,
416 TaskState::Error { endtime, .. } => endtime,
417 }
418 }
419
420 fn result_text(&self) -> String {
421 match self {
422 TaskState::Error { message, .. } => format!("TASK ERROR: {}", message),
423 other => format!("TASK {}", other),
424 }
425 }
426
427 fn from_endtime_and_message(endtime: i64, s: &str) -> Result<Self, Error> {
428 if s == "unknown" {
429 Ok(TaskState::Unknown { endtime })
430 } else if s == "OK" {
431 Ok(TaskState::OK { endtime })
432 } else if let Some(warnings) = s.strip_prefix("WARNINGS: ") {
433 let count: u64 = warnings.parse()?;
434 Ok(TaskState::Warning{ count, endtime })
435 } else if !s.is_empty() {
436 let message = if let Some(err) = s.strip_prefix("ERROR: ") { err } else { s }.to_string();
437 Ok(TaskState::Error{ message, endtime })
438 } else {
439 bail!("unable to parse Task Status '{}'", s);
440 }
441 }
442 }
443
444 impl std::cmp::PartialOrd for TaskState {
445 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
446 Some(self.endtime().cmp(&other.endtime()))
447 }
448 }
449
450 impl std::cmp::Ord for TaskState {
451 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
452 self.endtime().cmp(&other.endtime())
453 }
454 }
455
456 impl std::fmt::Display for TaskState {
457 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
458 match self {
459 TaskState::Unknown { .. } => write!(f, "unknown"),
460 TaskState::OK { .. }=> write!(f, "OK"),
461 TaskState::Warning { count, .. } => write!(f, "WARNINGS: {}", count),
462 TaskState::Error { message, .. } => write!(f, "{}", message),
463 }
464 }
465 }
466
467 /// Task details including parsed UPID
468 ///
469 /// If there is no `state`, the task is still running.
470 #[derive(Debug)]
471 pub struct TaskListInfo {
472 /// The parsed UPID
473 pub upid: UPID,
474 /// UPID string representation
475 pub upid_str: String,
476 /// Task `(endtime, status)` if already finished
477 pub state: Option<TaskState>, // endtime, status
478 }
479
480 fn render_task_line(info: &TaskListInfo) -> String {
481 let mut raw = String::new();
482 if let Some(status) = &info.state {
483 raw.push_str(&format!("{} {:08X} {}\n", info.upid_str, status.endtime(), status));
484 } else {
485 raw.push_str(&info.upid_str);
486 raw.push('\n');
487 }
488
489 raw
490 }
491
492 fn render_task_list(list: &[TaskListInfo]) -> String {
493 let mut raw = String::new();
494 for info in list {
495 raw.push_str(&render_task_line(&info));
496 }
497 raw
498 }
499
500 // note this is not locked, caller has to make sure it is
501 // this will skip (and log) lines that are not valid status lines
502 fn read_task_file<R: Read>(reader: R) -> Result<Vec<TaskListInfo>, Error>
503 {
504 let reader = BufReader::new(reader);
505 let mut list = Vec::new();
506 for line in reader.lines() {
507 let line = line?;
508 match parse_worker_status_line(&line) {
509 Ok((upid_str, upid, state)) => list.push(TaskListInfo {
510 upid_str,
511 upid,
512 state
513 }),
514 Err(err) => {
515 eprintln!("unable to parse worker status '{}' - {}", line, err);
516 continue;
517 }
518 };
519 }
520
521 Ok(list)
522 }
523
524 // note this is not locked, caller has to make sure it is
525 fn read_task_file_from_path<P>(path: P) -> Result<Vec<TaskListInfo>, Error>
526 where
527 P: AsRef<std::path::Path> + std::fmt::Debug,
528 {
529 let file = match File::open(&path) {
530 Ok(f) => f,
531 Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
532 Err(err) => bail!("unable to open task list {:?} - {}", path, err),
533 };
534
535 read_task_file(file)
536 }
537
538 pub struct TaskListInfoIterator {
539 list: VecDeque<TaskListInfo>,
540 end: bool,
541 archive: Option<LogRotateFiles>,
542 lock: Option<TaskListLockGuard>,
543 }
544
545 impl TaskListInfoIterator {
546 pub fn new(active_only: bool) -> Result<Self, Error> {
547
548 let setup = worker_task_setup()?;
549
550 let (read_lock, active_list) = {
551 let lock = setup.lock_task_list_files(false)?;
552 let active_list = read_task_file_from_path(&setup.active_tasks_fn)?;
553
554 let needs_update = active_list
555 .iter()
556 .any(|info| info.state.is_some() || !worker_is_active_local(&info.upid));
557
558 // TODO remove with 1.x
559 let index_exists = setup.task_index_fn.is_file();
560
561 if needs_update || index_exists {
562 drop(lock);
563 setup.update_active_workers(None)?;
564 let lock = setup.lock_task_list_files(false)?;
565 let active_list = read_task_file_from_path(&setup.active_tasks_fn)?;
566 (lock, active_list)
567 } else {
568 (lock, active_list)
569 }
570 };
571
572 let archive = if active_only {
573 None
574 } else {
575 let logrotate = LogRotate::new(&setup.task_archive_fn, true)
576 .ok_or_else(|| format_err!("could not get archive file names"))?;
577 Some(logrotate.files())
578 };
579
580 let lock = if active_only { None } else { Some(read_lock) };
581
582 Ok(Self {
583 list: active_list.into(),
584 end: active_only,
585 archive,
586 lock,
587 })
588 }
589 }
590
591 impl Iterator for TaskListInfoIterator {
592 type Item = Result<TaskListInfo, Error>;
593
594 fn next(&mut self) -> Option<Self::Item> {
595 loop {
596 if let Some(element) = self.list.pop_back() {
597 return Some(Ok(element));
598 } else if self.end {
599 return None;
600 } else {
601 if let Some(mut archive) = self.archive.take() {
602 if let Some(file) = archive.next() {
603 let list = match read_task_file(file) {
604 Ok(list) => list,
605 Err(err) => return Some(Err(err)),
606 };
607 self.list.append(&mut list.into());
608 self.archive = Some(archive);
609 continue;
610 }
611 }
612
613 self.end = true;
614 self.lock.take();
615 }
616 }
617 }
618 }
619
620 /// Launch long running worker tasks.
621 ///
622 /// A worker task can either be a whole thread, or a simply tokio
623 /// task/future. Each task can `log()` messages, which are stored
624 /// persistently to files. Task should poll the `abort_requested`
625 /// flag, and stop execution when requested.
626 pub struct WorkerTask {
627 setup: &'static WorkerTaskSetup,
628 upid: UPID,
629 data: Mutex<WorkerTaskData>,
630 abort_requested: AtomicBool,
631 }
632
633 impl std::fmt::Display for WorkerTask {
634
635 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
636 self.upid.fmt(f)
637 }
638 }
639
640 struct WorkerTaskData {
641 logger: FileLogger,
642 progress: f64, // 0..1
643 warn_count: u64,
644 pub abort_listeners: Vec<oneshot::Sender<()>>,
645 }
646
647 impl WorkerTask {
648
649 pub fn new(
650 worker_type: &str,
651 worker_id: Option<String>,
652 auth_id: String,
653 to_stdout: bool,
654 ) -> Result<Arc<Self>, Error> {
655
656 let setup = worker_task_setup()?;
657
658 let upid = UPID::new(worker_type, worker_id, auth_id)?;
659 let task_id = upid.task_id;
660
661 let mut path = setup.taskdir.clone();
662
663 path.push(format!("{:02X}", upid.pstart & 255));
664
665 let dir_opts = setup.file_opts.clone()
666 .perm(nix::sys::stat::Mode::from_bits_truncate(0o755));
667
668 create_path(&path, None, Some(dir_opts))?;
669
670 path.push(upid.to_string());
671
672 let logger_options = FileLogOptions {
673 to_stdout,
674 exclusive: true,
675 prefix_time: true,
676 read: true,
677 file_opts: setup.file_opts.clone(),
678 ..Default::default()
679 };
680 let logger = FileLogger::new(&path, logger_options)?;
681
682 let worker = Arc::new(Self {
683 setup,
684 upid: upid.clone(),
685 abort_requested: AtomicBool::new(false),
686 data: Mutex::new(WorkerTaskData {
687 logger,
688 progress: 0.0,
689 warn_count: 0,
690 abort_listeners: vec![],
691 }),
692 });
693
694 // scope to drop the lock again after inserting
695 {
696 let mut hash = WORKER_TASK_LIST.lock().unwrap();
697 hash.insert(task_id, worker.clone());
698 crate::set_worker_count(hash.len());
699 }
700
701 setup.update_active_workers(Some(&upid))?;
702
703 Ok(worker)
704 }
705
706 /// Spawn a new tokio task/future.
707 pub fn spawn<F, T>(
708 worker_type: &str,
709 worker_id: Option<String>,
710 auth_id: String,
711 to_stdout: bool,
712 f: F,
713 ) -> Result<String, Error>
714 where F: Send + 'static + FnOnce(Arc<WorkerTask>) -> T,
715 T: Send + 'static + Future<Output = Result<(), Error>>,
716 {
717 let worker = WorkerTask::new(worker_type, worker_id, auth_id, to_stdout)?;
718 let upid_str = worker.upid.to_string();
719 let f = f(worker.clone());
720 tokio::spawn(async move {
721 let result = f.await;
722 worker.log_result(&result);
723 });
724
725 Ok(upid_str)
726 }
727
728 /// Create a new worker thread.
729 pub fn new_thread<F>(
730 worker_type: &str,
731 worker_id: Option<String>,
732 auth_id: String,
733 to_stdout: bool,
734 f: F,
735 ) -> Result<String, Error>
736 where F: Send + UnwindSafe + 'static + FnOnce(Arc<WorkerTask>) -> Result<(), Error>
737 {
738 let worker = WorkerTask::new(worker_type, worker_id, auth_id, to_stdout)?;
739 let upid_str = worker.upid.to_string();
740
741 let _child = std::thread::Builder::new().name(upid_str.clone()).spawn(move || {
742 let worker1 = worker.clone();
743 let result = match std::panic::catch_unwind(move || f(worker1)) {
744 Ok(r) => r,
745 Err(panic) => {
746 match panic.downcast::<&str>() {
747 Ok(panic_msg) => {
748 Err(format_err!("worker panicked: {}", panic_msg))
749 }
750 Err(_) => {
751 Err(format_err!("worker panicked: unknown type."))
752 }
753 }
754 }
755 };
756
757 worker.log_result(&result);
758 });
759
760 Ok(upid_str)
761 }
762
763 /// create state from self and a result
764 pub fn create_state(&self, result: &Result<(), Error>) -> TaskState {
765 let warn_count = self.data.lock().unwrap().warn_count;
766
767 let endtime = proxmox::tools::time::epoch_i64();
768
769 if let Err(err) = result {
770 TaskState::Error { message: err.to_string(), endtime }
771 } else if warn_count > 0 {
772 TaskState::Warning { count: warn_count, endtime }
773 } else {
774 TaskState::OK { endtime }
775 }
776 }
777
778 /// Log task result, remove task from running list
779 pub fn log_result(&self, result: &Result<(), Error>) {
780 let state = self.create_state(result);
781 self.log(state.result_text());
782
783 WORKER_TASK_LIST.lock().unwrap().remove(&self.upid.task_id);
784 let _ = self.setup.update_active_workers(None);
785 crate::set_worker_count(WORKER_TASK_LIST.lock().unwrap().len());
786 }
787
788 /// Log a message.
789 pub fn log<S: AsRef<str>>(&self, msg: S) {
790 let mut data = self.data.lock().unwrap();
791 data.logger.log(msg);
792 }
793
794 /// Log a message as warning.
795 pub fn warn<S: AsRef<str>>(&self, msg: S) {
796 let mut data = self.data.lock().unwrap();
797 data.logger.log(format!("WARN: {}", msg.as_ref()));
798 data.warn_count += 1;
799 }
800
801 /// Set progress indicator
802 pub fn progress(&self, progress: f64) {
803 if progress >= 0.0 && progress <= 1.0 {
804 let mut data = self.data.lock().unwrap();
805 data.progress = progress;
806 } else {
807 // fixme: log!("task '{}': ignoring strange value for progress '{}'", self.upid, progress);
808 }
809 }
810
811 /// Request abort
812 pub fn request_abort(&self) {
813 eprintln!("set abort flag for worker {}", self.upid);
814
815 let prev_abort = self.abort_requested.swap(true, Ordering::SeqCst);
816 if !prev_abort { // log abort one time
817 self.log(format!("received abort request ..."));
818 }
819 // noitify listeners
820 let mut data = self.data.lock().unwrap();
821 loop {
822 match data.abort_listeners.pop() {
823 None => { break; },
824 Some(ch) => {
825 let _ = ch.send(()); // ignore errors here
826 },
827 }
828 }
829 }
830
831 /// Test if abort was requested.
832 pub fn abort_requested(&self) -> bool {
833 self.abort_requested.load(Ordering::SeqCst)
834 }
835
836 /// Fail if abort was requested.
837 pub fn fail_on_abort(&self) -> Result<(), Error> {
838 if self.abort_requested() {
839 bail!("abort requested - aborting task");
840 }
841 Ok(())
842 }
843
844 /// Get a future which resolves on task abort
845 pub fn abort_future(&self) -> oneshot::Receiver<()> {
846 let (tx, rx) = oneshot::channel::<()>();
847
848 let mut data = self.data.lock().unwrap();
849 if self.abort_requested() {
850 let _ = tx.send(());
851 } else {
852 data.abort_listeners.push(tx);
853 }
854 rx
855 }
856
857 pub fn upid(&self) -> &UPID {
858 &self.upid
859 }
860 }
861
862 impl pbs_tools::task::TaskState for WorkerTask {
863 fn check_abort(&self) -> Result<(), Error> {
864 self.fail_on_abort()
865 }
866
867 fn log(&self, level: log::Level, message: &std::fmt::Arguments) {
868 match level {
869 log::Level::Error => self.warn(&message.to_string()),
870 log::Level::Warn => self.warn(&message.to_string()),
871 log::Level::Info => self.log(&message.to_string()),
872 log::Level::Debug => self.log(&format!("DEBUG: {}", message)),
873 log::Level::Trace => self.log(&format!("TRACE: {}", message)),
874 }
875 }
876 }
877
878 /// Wait for a locally spanned worker task
879 ///
880 /// Note: local workers should print logs to stdout, so there is no
881 /// need to fetch/display logs. We just wait for the worker to finish.
882 pub async fn wait_for_local_worker(upid_str: &str) -> Result<(), Error> {
883
884 let upid: UPID = upid_str.parse()?;
885
886 let sleep_duration = core::time::Duration::new(0, 100_000_000);
887
888 loop {
889 if worker_is_active_local(&upid) {
890 tokio::time::sleep(sleep_duration).await;
891 } else {
892 break;
893 }
894 }
895 Ok(())
896 }
897
898 /// Request abort of a local worker (if existing and running)
899 pub fn abort_local_worker(upid: UPID) {
900 if let Some(ref worker) = WORKER_TASK_LIST.lock().unwrap().get(&upid.task_id) {
901 worker.request_abort();
902 }
903 }