]> git.proxmox.com Git - proxmox-backup.git/blob - src/server/worker_task.rs
split out pbs-buildcfg module
[proxmox-backup.git] / src / server / worker_task.rs
1 use std::collections::{HashMap, VecDeque};
2 use std::fs::File;
3 use std::io::{Read, Write, BufRead, BufReader};
4 use std::panic::UnwindSafe;
5 use std::sync::atomic::{AtomicBool, Ordering};
6 use std::sync::{Arc, Mutex};
7
8 use anyhow::{bail, format_err, Error};
9 use futures::*;
10 use lazy_static::lazy_static;
11 use serde_json::{json, Value};
12 use serde::{Serialize, Deserialize};
13 use tokio::sync::oneshot;
14
15 use proxmox::sys::linux::procfs;
16 use proxmox::try_block;
17 use proxmox::tools::fs::{create_path, open_file_locked, replace_file, CreateOptions};
18
19 use super::UPID;
20
21 use pbs_buildcfg;
22
23 use crate::server;
24 use crate::tools::logrotate::{LogRotate, LogRotateFiles};
25 use crate::tools::{FileLogger, FileLogOptions};
26 use crate::api2::types::{Authid, TaskStateType};
27
28 macro_rules! taskdir {
29 ($subdir:expr) => (concat!(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR_M!(), "/tasks", $subdir))
30 }
31 pub const PROXMOX_BACKUP_TASK_DIR: &str = taskdir!("/");
32 pub const PROXMOX_BACKUP_TASK_LOCK_FN: &str = taskdir!("/.active.lock");
33 pub const PROXMOX_BACKUP_ACTIVE_TASK_FN: &str = taskdir!("/active");
34 pub const PROXMOX_BACKUP_INDEX_TASK_FN: &str = taskdir!("/index");
35 pub const PROXMOX_BACKUP_ARCHIVE_TASK_FN: &str = taskdir!("/archive");
36
37 lazy_static! {
38 static ref WORKER_TASK_LIST: Mutex<HashMap<usize, Arc<WorkerTask>>> = Mutex::new(HashMap::new());
39 }
40
41 /// checks if the task UPID refers to a worker from this process
42 fn is_local_worker(upid: &UPID) -> bool {
43 upid.pid == server::pid() && upid.pstart == server::pstart()
44 }
45
46 /// Test if the task is still running
47 pub async fn worker_is_active(upid: &UPID) -> Result<bool, Error> {
48 if is_local_worker(upid) {
49 return Ok(WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id));
50 }
51
52 if procfs::check_process_running_pstart(upid.pid, upid.pstart).is_none() {
53 return Ok(false);
54 }
55
56 let sock = server::ctrl_sock_from_pid(upid.pid);
57 let cmd = json!({
58 "command": "worker-task-status",
59 "args": {
60 "upid": upid.to_string(),
61 },
62 });
63 let status = super::send_command(sock, &cmd).await?;
64
65 if let Some(active) = status.as_bool() {
66 Ok(active)
67 } else {
68 bail!("got unexpected result {:?} (expected bool)", status);
69 }
70 }
71
72 /// Test if the task is still running (fast but inaccurate implementation)
73 ///
74 /// If the task is spawned from a different process, we simply return if
75 /// that process is still running. This information is good enough to detect
76 /// stale tasks...
77 pub fn worker_is_active_local(upid: &UPID) -> bool {
78 if is_local_worker(upid) {
79 WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id)
80 } else {
81 procfs::check_process_running_pstart(upid.pid, upid.pstart).is_some()
82 }
83 }
84
85 pub fn register_task_control_commands(
86 commando_sock: &mut super::CommandoSocket,
87 ) -> Result<(), Error> {
88 fn get_upid(args: Option<&Value>) -> Result<UPID, Error> {
89 let args = if let Some(args) = args { args } else { bail!("missing args") };
90 let upid = match args.get("upid") {
91 Some(Value::String(upid)) => upid.parse::<UPID>()?,
92 None => bail!("no upid in args"),
93 _ => bail!("unable to parse upid"),
94 };
95 if !is_local_worker(&upid) {
96 bail!("upid does not belong to this process");
97 }
98 Ok(upid)
99 }
100
101 commando_sock.register_command("worker-task-abort".into(), move |args| {
102 let upid = get_upid(args)?;
103
104 if let Some(ref worker) = WORKER_TASK_LIST.lock().unwrap().get(&upid.task_id) {
105 worker.request_abort();
106 }
107 Ok(Value::Null)
108 })?;
109 commando_sock.register_command("worker-task-status".into(), move |args| {
110 let upid = get_upid(args)?;
111
112 let active = WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id);
113
114 Ok(active.into())
115 })?;
116
117 Ok(())
118 }
119
120 pub fn abort_worker_async(upid: UPID) {
121 tokio::spawn(async move {
122 if let Err(err) = abort_worker(upid).await {
123 eprintln!("abort worker failed - {}", err);
124 }
125 });
126 }
127
128 pub async fn abort_worker(upid: UPID) -> Result<(), Error> {
129
130 let sock = server::ctrl_sock_from_pid(upid.pid);
131 let cmd = json!({
132 "command": "worker-task-abort",
133 "args": {
134 "upid": upid.to_string(),
135 },
136 });
137 super::send_command(sock, &cmd).map_ok(|_| ()).await
138 }
139
140 fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<TaskState>), Error> {
141
142 let data = line.splitn(3, ' ').collect::<Vec<&str>>();
143
144 let len = data.len();
145
146 match len {
147 1 => Ok((data[0].to_owned(), data[0].parse::<UPID>()?, None)),
148 3 => {
149 let endtime = i64::from_str_radix(data[1], 16)?;
150 let state = TaskState::from_endtime_and_message(endtime, data[2])?;
151 Ok((data[0].to_owned(), data[0].parse::<UPID>()?, Some(state)))
152 }
153 _ => bail!("wrong number of components"),
154 }
155 }
156
157 /// Create task log directory with correct permissions
158 pub fn create_task_log_dirs() -> Result<(), Error> {
159
160 try_block!({
161 let backup_user = crate::backup::backup_user()?;
162 let opts = CreateOptions::new()
163 .owner(backup_user.uid)
164 .group(backup_user.gid);
165
166 create_path(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR, None, Some(opts.clone()))?;
167 create_path(PROXMOX_BACKUP_TASK_DIR, None, Some(opts.clone()))?;
168 create_path(pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR, None, Some(opts))?;
169 Ok(())
170 }).map_err(|err: Error| format_err!("unable to create task log dir - {}", err))?;
171
172 Ok(())
173 }
174
175 /// Read endtime (time of last log line) and exitstatus from task log file
176 /// If there is not a single line with at valid datetime, we assume the
177 /// starttime to be the endtime
178 pub fn upid_read_status(upid: &UPID) -> Result<TaskState, Error> {
179
180 let mut status = TaskState::Unknown { endtime: upid.starttime };
181
182 let path = upid.log_path();
183
184 let mut file = File::open(path)?;
185
186 /// speedup - only read tail
187 use std::io::Seek;
188 use std::io::SeekFrom;
189 let _ = file.seek(SeekFrom::End(-8192)); // ignore errors
190
191 let mut data = Vec::with_capacity(8192);
192 file.read_to_end(&mut data)?;
193
194 // strip newlines at the end of the task logs
195 while data.last() == Some(&b'\n') {
196 data.pop();
197 }
198
199 let last_line = match data.iter().rposition(|c| *c == b'\n') {
200 Some(start) if data.len() > (start+1) => &data[start+1..],
201 Some(_) => &data, // should not happen, since we removed all trailing newlines
202 None => &data,
203 };
204
205 let last_line = std::str::from_utf8(last_line)
206 .map_err(|err| format_err!("upid_read_status: utf8 parse failed: {}", err))?;
207
208 let mut iter = last_line.splitn(2, ": ");
209 if let Some(time_str) = iter.next() {
210 if let Ok(endtime) = proxmox::tools::time::parse_rfc3339(time_str) {
211 // set the endtime even if we cannot parse the state
212 status = TaskState::Unknown { endtime };
213 if let Some(rest) = iter.next().and_then(|rest| rest.strip_prefix("TASK ")) {
214 if let Ok(state) = TaskState::from_endtime_and_message(endtime, rest) {
215 status = state;
216 }
217 }
218 }
219 }
220
221 Ok(status)
222 }
223
224 /// Task State
225 #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
226 pub enum TaskState {
227 /// The Task ended with an undefined state
228 Unknown { endtime: i64 },
229 /// The Task ended and there were no errors or warnings
230 OK { endtime: i64 },
231 /// The Task had 'count' amount of warnings and no errors
232 Warning { count: u64, endtime: i64 },
233 /// The Task ended with the error described in 'message'
234 Error { message: String, endtime: i64 },
235 }
236
237 impl TaskState {
238 pub fn endtime(&self) -> i64 {
239 match *self {
240 TaskState::Unknown { endtime } => endtime,
241 TaskState::OK { endtime } => endtime,
242 TaskState::Warning { endtime, .. } => endtime,
243 TaskState::Error { endtime, .. } => endtime,
244 }
245 }
246
247 pub fn tasktype(&self) -> TaskStateType {
248 match self {
249 TaskState::OK { .. } => TaskStateType::OK,
250 TaskState::Unknown { .. } => TaskStateType::Unknown,
251 TaskState::Error { .. } => TaskStateType::Error,
252 TaskState::Warning { .. } => TaskStateType::Warning,
253 }
254 }
255
256 fn result_text(&self) -> String {
257 match self {
258 TaskState::Error { message, .. } => format!("TASK ERROR: {}", message),
259 other => format!("TASK {}", other),
260 }
261 }
262
263 fn from_endtime_and_message(endtime: i64, s: &str) -> Result<Self, Error> {
264 if s == "unknown" {
265 Ok(TaskState::Unknown { endtime })
266 } else if s == "OK" {
267 Ok(TaskState::OK { endtime })
268 } else if let Some(warnings) = s.strip_prefix("WARNINGS: ") {
269 let count: u64 = warnings.parse()?;
270 Ok(TaskState::Warning{ count, endtime })
271 } else if !s.is_empty() {
272 let message = if let Some(err) = s.strip_prefix("ERROR: ") { err } else { s }.to_string();
273 Ok(TaskState::Error{ message, endtime })
274 } else {
275 bail!("unable to parse Task Status '{}'", s);
276 }
277 }
278 }
279
280 impl std::cmp::PartialOrd for TaskState {
281 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
282 Some(self.endtime().cmp(&other.endtime()))
283 }
284 }
285
286 impl std::cmp::Ord for TaskState {
287 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
288 self.endtime().cmp(&other.endtime())
289 }
290 }
291
292 impl std::fmt::Display for TaskState {
293 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
294 match self {
295 TaskState::Unknown { .. } => write!(f, "unknown"),
296 TaskState::OK { .. }=> write!(f, "OK"),
297 TaskState::Warning { count, .. } => write!(f, "WARNINGS: {}", count),
298 TaskState::Error { message, .. } => write!(f, "{}", message),
299 }
300 }
301 }
302
303 /// Task details including parsed UPID
304 ///
305 /// If there is no `state`, the task is still running.
306 #[derive(Debug)]
307 pub struct TaskListInfo {
308 /// The parsed UPID
309 pub upid: UPID,
310 /// UPID string representation
311 pub upid_str: String,
312 /// Task `(endtime, status)` if already finished
313 pub state: Option<TaskState>, // endtime, status
314 }
315
316 fn lock_task_list_files(exclusive: bool) -> Result<std::fs::File, Error> {
317 let backup_user = crate::backup::backup_user()?;
318
319 let lock = open_file_locked(PROXMOX_BACKUP_TASK_LOCK_FN, std::time::Duration::new(10, 0), exclusive)?;
320 nix::unistd::chown(PROXMOX_BACKUP_TASK_LOCK_FN, Some(backup_user.uid), Some(backup_user.gid))?;
321
322 Ok(lock)
323 }
324
325 /// checks if the Task Archive is bigger that 'size_threshold' bytes, and
326 /// rotates it if it is
327 pub fn rotate_task_log_archive(size_threshold: u64, compress: bool, max_files: Option<usize>) -> Result<bool, Error> {
328 let _lock = lock_task_list_files(true)?;
329
330 let mut logrotate = LogRotate::new(PROXMOX_BACKUP_ARCHIVE_TASK_FN, compress)
331 .ok_or_else(|| format_err!("could not get archive file names"))?;
332
333 logrotate.rotate(size_threshold, None, max_files)
334 }
335
336 // atomically read/update the task list, update status of finished tasks
337 // new_upid is added to the list when specified.
338 fn update_active_workers(new_upid: Option<&UPID>) -> Result<(), Error> {
339
340 let backup_user = crate::backup::backup_user()?;
341
342 let lock = lock_task_list_files(true)?;
343
344 // TODO remove with 1.x
345 let mut finish_list: Vec<TaskListInfo> = read_task_file_from_path(PROXMOX_BACKUP_INDEX_TASK_FN)?;
346 let had_index_file = !finish_list.is_empty();
347
348 // We use filter_map because one negative case wants to *move* the data into `finish_list`,
349 // clippy doesn't quite catch this!
350 #[allow(clippy::unnecessary_filter_map)]
351 let mut active_list: Vec<TaskListInfo> = read_task_file_from_path(PROXMOX_BACKUP_ACTIVE_TASK_FN)?
352 .into_iter()
353 .filter_map(|info| {
354 if info.state.is_some() {
355 // this can happen when the active file still includes finished tasks
356 finish_list.push(info);
357 return None;
358 }
359
360 if !worker_is_active_local(&info.upid) {
361 // println!("Detected stopped task '{}'", &info.upid_str);
362 let now = proxmox::tools::time::epoch_i64();
363 let status = upid_read_status(&info.upid).unwrap_or(TaskState::Unknown { endtime: now });
364 finish_list.push(TaskListInfo {
365 upid: info.upid,
366 upid_str: info.upid_str,
367 state: Some(status)
368 });
369 return None;
370 }
371
372 Some(info)
373 }).collect();
374
375 if let Some(upid) = new_upid {
376 active_list.push(TaskListInfo { upid: upid.clone(), upid_str: upid.to_string(), state: None });
377 }
378
379 let active_raw = render_task_list(&active_list);
380
381 replace_file(
382 PROXMOX_BACKUP_ACTIVE_TASK_FN,
383 active_raw.as_bytes(),
384 CreateOptions::new()
385 .owner(backup_user.uid)
386 .group(backup_user.gid),
387 )?;
388
389 finish_list.sort_unstable_by(|a, b| {
390 match (&a.state, &b.state) {
391 (Some(s1), Some(s2)) => s1.cmp(&s2),
392 (Some(_), None) => std::cmp::Ordering::Less,
393 (None, Some(_)) => std::cmp::Ordering::Greater,
394 _ => a.upid.starttime.cmp(&b.upid.starttime),
395 }
396 });
397
398 if !finish_list.is_empty() {
399 match std::fs::OpenOptions::new().append(true).create(true).open(PROXMOX_BACKUP_ARCHIVE_TASK_FN) {
400 Ok(mut writer) => {
401 for info in &finish_list {
402 writer.write_all(render_task_line(&info).as_bytes())?;
403 }
404 },
405 Err(err) => bail!("could not write task archive - {}", err),
406 }
407
408 nix::unistd::chown(PROXMOX_BACKUP_ARCHIVE_TASK_FN, Some(backup_user.uid), Some(backup_user.gid))?;
409 }
410
411 // TODO Remove with 1.x
412 // for compatibility, if we had an INDEX file, we do not need it anymore
413 if had_index_file {
414 let _ = nix::unistd::unlink(PROXMOX_BACKUP_INDEX_TASK_FN);
415 }
416
417 drop(lock);
418
419 Ok(())
420 }
421
422 fn render_task_line(info: &TaskListInfo) -> String {
423 let mut raw = String::new();
424 if let Some(status) = &info.state {
425 raw.push_str(&format!("{} {:08X} {}\n", info.upid_str, status.endtime(), status));
426 } else {
427 raw.push_str(&info.upid_str);
428 raw.push('\n');
429 }
430
431 raw
432 }
433
434 fn render_task_list(list: &[TaskListInfo]) -> String {
435 let mut raw = String::new();
436 for info in list {
437 raw.push_str(&render_task_line(&info));
438 }
439 raw
440 }
441
442 // note this is not locked, caller has to make sure it is
443 // this will skip (and log) lines that are not valid status lines
444 fn read_task_file<R: Read>(reader: R) -> Result<Vec<TaskListInfo>, Error>
445 {
446 let reader = BufReader::new(reader);
447 let mut list = Vec::new();
448 for line in reader.lines() {
449 let line = line?;
450 match parse_worker_status_line(&line) {
451 Ok((upid_str, upid, state)) => list.push(TaskListInfo {
452 upid_str,
453 upid,
454 state
455 }),
456 Err(err) => {
457 eprintln!("unable to parse worker status '{}' - {}", line, err);
458 continue;
459 }
460 };
461 }
462
463 Ok(list)
464 }
465
466 // note this is not locked, caller has to make sure it is
467 fn read_task_file_from_path<P>(path: P) -> Result<Vec<TaskListInfo>, Error>
468 where
469 P: AsRef<std::path::Path> + std::fmt::Debug,
470 {
471 let file = match File::open(&path) {
472 Ok(f) => f,
473 Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
474 Err(err) => bail!("unable to open task list {:?} - {}", path, err),
475 };
476
477 read_task_file(file)
478 }
479
480 pub struct TaskListInfoIterator {
481 list: VecDeque<TaskListInfo>,
482 end: bool,
483 archive: Option<LogRotateFiles>,
484 lock: Option<File>,
485 }
486
487 impl TaskListInfoIterator {
488 pub fn new(active_only: bool) -> Result<Self, Error> {
489 let (read_lock, active_list) = {
490 let lock = lock_task_list_files(false)?;
491 let active_list = read_task_file_from_path(PROXMOX_BACKUP_ACTIVE_TASK_FN)?;
492
493 let needs_update = active_list
494 .iter()
495 .any(|info| info.state.is_some() || !worker_is_active_local(&info.upid));
496
497 // TODO remove with 1.x
498 let index_exists = std::path::Path::new(PROXMOX_BACKUP_INDEX_TASK_FN).is_file();
499
500 if needs_update || index_exists {
501 drop(lock);
502 update_active_workers(None)?;
503 let lock = lock_task_list_files(false)?;
504 let active_list = read_task_file_from_path(PROXMOX_BACKUP_ACTIVE_TASK_FN)?;
505 (lock, active_list)
506 } else {
507 (lock, active_list)
508 }
509 };
510
511 let archive = if active_only {
512 None
513 } else {
514 let logrotate = LogRotate::new(PROXMOX_BACKUP_ARCHIVE_TASK_FN, true)
515 .ok_or_else(|| format_err!("could not get archive file names"))?;
516 Some(logrotate.files())
517 };
518
519 let lock = if active_only { None } else { Some(read_lock) };
520
521 Ok(Self {
522 list: active_list.into(),
523 end: active_only,
524 archive,
525 lock,
526 })
527 }
528 }
529
530 impl Iterator for TaskListInfoIterator {
531 type Item = Result<TaskListInfo, Error>;
532
533 fn next(&mut self) -> Option<Self::Item> {
534 loop {
535 if let Some(element) = self.list.pop_back() {
536 return Some(Ok(element));
537 } else if self.end {
538 return None;
539 } else {
540 if let Some(mut archive) = self.archive.take() {
541 if let Some(file) = archive.next() {
542 let list = match read_task_file(file) {
543 Ok(list) => list,
544 Err(err) => return Some(Err(err)),
545 };
546 self.list.append(&mut list.into());
547 self.archive = Some(archive);
548 continue;
549 }
550 }
551
552 self.end = true;
553 self.lock.take();
554 }
555 }
556 }
557 }
558
559 /// Launch long running worker tasks.
560 ///
561 /// A worker task can either be a whole thread, or a simply tokio
562 /// task/future. Each task can `log()` messages, which are stored
563 /// persistently to files. Task should poll the `abort_requested`
564 /// flag, and stop execution when requested.
565 #[derive(Debug)]
566 pub struct WorkerTask {
567 upid: UPID,
568 data: Mutex<WorkerTaskData>,
569 abort_requested: AtomicBool,
570 }
571
572 impl std::fmt::Display for WorkerTask {
573
574 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
575 self.upid.fmt(f)
576 }
577 }
578
579 #[derive(Debug)]
580 struct WorkerTaskData {
581 logger: FileLogger,
582 progress: f64, // 0..1
583 warn_count: u64,
584 pub abort_listeners: Vec<oneshot::Sender<()>>,
585 }
586
587 impl WorkerTask {
588
589 pub fn new(worker_type: &str, worker_id: Option<String>, auth_id: Authid, to_stdout: bool) -> Result<Arc<Self>, Error> {
590 let upid = UPID::new(worker_type, worker_id, auth_id)?;
591 let task_id = upid.task_id;
592
593 let mut path = std::path::PathBuf::from(PROXMOX_BACKUP_TASK_DIR);
594
595 path.push(format!("{:02X}", upid.pstart & 255));
596
597 let backup_user = crate::backup::backup_user()?;
598
599 create_path(&path, None, Some(CreateOptions::new().owner(backup_user.uid).group(backup_user.gid)))?;
600
601 path.push(upid.to_string());
602
603 let logger_options = FileLogOptions {
604 to_stdout,
605 exclusive: true,
606 prefix_time: true,
607 read: true,
608 ..Default::default()
609 };
610 let logger = FileLogger::new(&path, logger_options)?;
611 nix::unistd::chown(&path, Some(backup_user.uid), Some(backup_user.gid))?;
612
613 let worker = Arc::new(Self {
614 upid: upid.clone(),
615 abort_requested: AtomicBool::new(false),
616 data: Mutex::new(WorkerTaskData {
617 logger,
618 progress: 0.0,
619 warn_count: 0,
620 abort_listeners: vec![],
621 }),
622 });
623
624 // scope to drop the lock again after inserting
625 {
626 let mut hash = WORKER_TASK_LIST.lock().unwrap();
627 hash.insert(task_id, worker.clone());
628 super::set_worker_count(hash.len());
629 }
630
631 update_active_workers(Some(&upid))?;
632
633 Ok(worker)
634 }
635
636 /// Spawn a new tokio task/future.
637 pub fn spawn<F, T>(
638 worker_type: &str,
639 worker_id: Option<String>,
640 auth_id: Authid,
641 to_stdout: bool,
642 f: F,
643 ) -> Result<String, Error>
644 where F: Send + 'static + FnOnce(Arc<WorkerTask>) -> T,
645 T: Send + 'static + Future<Output = Result<(), Error>>,
646 {
647 let worker = WorkerTask::new(worker_type, worker_id, auth_id, to_stdout)?;
648 let upid_str = worker.upid.to_string();
649 let f = f(worker.clone());
650 tokio::spawn(async move {
651 let result = f.await;
652 worker.log_result(&result);
653 });
654
655 Ok(upid_str)
656 }
657
658 /// Create a new worker thread.
659 pub fn new_thread<F>(
660 worker_type: &str,
661 worker_id: Option<String>,
662 auth_id: Authid,
663 to_stdout: bool,
664 f: F,
665 ) -> Result<String, Error>
666 where F: Send + UnwindSafe + 'static + FnOnce(Arc<WorkerTask>) -> Result<(), Error>
667 {
668 let worker = WorkerTask::new(worker_type, worker_id, auth_id, to_stdout)?;
669 let upid_str = worker.upid.to_string();
670
671 let _child = std::thread::Builder::new().name(upid_str.clone()).spawn(move || {
672 let worker1 = worker.clone();
673 let result = match std::panic::catch_unwind(move || f(worker1)) {
674 Ok(r) => r,
675 Err(panic) => {
676 match panic.downcast::<&str>() {
677 Ok(panic_msg) => {
678 Err(format_err!("worker panicked: {}", panic_msg))
679 }
680 Err(_) => {
681 Err(format_err!("worker panicked: unknown type."))
682 }
683 }
684 }
685 };
686
687 worker.log_result(&result);
688 });
689
690 Ok(upid_str)
691 }
692
693 /// create state from self and a result
694 pub fn create_state(&self, result: &Result<(), Error>) -> TaskState {
695 let warn_count = self.data.lock().unwrap().warn_count;
696
697 let endtime = proxmox::tools::time::epoch_i64();
698
699 if let Err(err) = result {
700 TaskState::Error { message: err.to_string(), endtime }
701 } else if warn_count > 0 {
702 TaskState::Warning { count: warn_count, endtime }
703 } else {
704 TaskState::OK { endtime }
705 }
706 }
707
708 /// Log task result, remove task from running list
709 pub fn log_result(&self, result: &Result<(), Error>) {
710 let state = self.create_state(result);
711 self.log(state.result_text());
712
713 WORKER_TASK_LIST.lock().unwrap().remove(&self.upid.task_id);
714 let _ = update_active_workers(None);
715 super::set_worker_count(WORKER_TASK_LIST.lock().unwrap().len());
716 }
717
718 /// Log a message.
719 pub fn log<S: AsRef<str>>(&self, msg: S) {
720 let mut data = self.data.lock().unwrap();
721 data.logger.log(msg);
722 }
723
724 /// Log a message as warning.
725 pub fn warn<S: AsRef<str>>(&self, msg: S) {
726 let mut data = self.data.lock().unwrap();
727 data.logger.log(format!("WARN: {}", msg.as_ref()));
728 data.warn_count += 1;
729 }
730
731 /// Set progress indicator
732 pub fn progress(&self, progress: f64) {
733 if progress >= 0.0 && progress <= 1.0 {
734 let mut data = self.data.lock().unwrap();
735 data.progress = progress;
736 } else {
737 // fixme: log!("task '{}': ignoring strange value for progress '{}'", self.upid, progress);
738 }
739 }
740
741 /// Request abort
742 pub fn request_abort(&self) {
743 eprintln!("set abort flag for worker {}", self.upid);
744
745 let prev_abort = self.abort_requested.swap(true, Ordering::SeqCst);
746 if !prev_abort { // log abort one time
747 self.log(format!("received abort request ..."));
748 }
749 // noitify listeners
750 let mut data = self.data.lock().unwrap();
751 loop {
752 match data.abort_listeners.pop() {
753 None => { break; },
754 Some(ch) => {
755 let _ = ch.send(()); // ignore errors here
756 },
757 }
758 }
759 }
760
761 /// Test if abort was requested.
762 pub fn abort_requested(&self) -> bool {
763 self.abort_requested.load(Ordering::SeqCst)
764 }
765
766 /// Fail if abort was requested.
767 pub fn fail_on_abort(&self) -> Result<(), Error> {
768 if self.abort_requested() {
769 bail!("abort requested - aborting task");
770 }
771 Ok(())
772 }
773
774 /// Get a future which resolves on task abort
775 pub fn abort_future(&self) -> oneshot::Receiver<()> {
776 let (tx, rx) = oneshot::channel::<()>();
777
778 let mut data = self.data.lock().unwrap();
779 if self.abort_requested() {
780 let _ = tx.send(());
781 } else {
782 data.abort_listeners.push(tx);
783 }
784 rx
785 }
786
787 pub fn upid(&self) -> &UPID {
788 &self.upid
789 }
790 }
791
792 impl crate::task::TaskState for WorkerTask {
793 fn check_abort(&self) -> Result<(), Error> {
794 self.fail_on_abort()
795 }
796
797 fn log(&self, level: log::Level, message: &std::fmt::Arguments) {
798 match level {
799 log::Level::Error => self.warn(&message.to_string()),
800 log::Level::Warn => self.warn(&message.to_string()),
801 log::Level::Info => self.log(&message.to_string()),
802 log::Level::Debug => self.log(&format!("DEBUG: {}", message)),
803 log::Level::Trace => self.log(&format!("TRACE: {}", message)),
804 }
805 }
806 }