]> git.proxmox.com Git - proxmox-backup.git/blob - src/server/worker_task.rs
split proxmox-file-restore into its own crate
[proxmox-backup.git] / src / server / worker_task.rs
1 use std::collections::{HashMap, VecDeque};
2 use std::fs::File;
3 use std::io::{Read, Write, BufRead, BufReader};
4 use std::panic::UnwindSafe;
5 use std::sync::atomic::{AtomicBool, Ordering};
6 use std::sync::{Arc, Mutex};
7
8 use anyhow::{bail, format_err, Error};
9 use futures::*;
10 use lazy_static::lazy_static;
11 use serde_json::{json, Value};
12 use serde::{Serialize, Deserialize};
13 use tokio::sync::oneshot;
14
15 use proxmox::sys::linux::procfs;
16 use proxmox::try_block;
17 use proxmox::tools::fs::{create_path, replace_file, CreateOptions};
18
19 use pbs_buildcfg;
20 use pbs_tools::logrotate::{LogRotate, LogRotateFiles};
21
22 use super::{UPID, UPIDExt};
23
24 use crate::server;
25 use crate::tools::{FileLogger, FileLogOptions};
26 use crate::api2::types::{Authid, TaskStateType};
27 use crate::backup::{open_backup_lockfile, BackupLockGuard};
28
29 macro_rules! taskdir {
30 ($subdir:expr) => (concat!(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR_M!(), "/tasks", $subdir))
31 }
32 pub const PROXMOX_BACKUP_TASK_DIR: &str = taskdir!("/");
33 pub const PROXMOX_BACKUP_TASK_LOCK_FN: &str = taskdir!("/.active.lock");
34 pub const PROXMOX_BACKUP_ACTIVE_TASK_FN: &str = taskdir!("/active");
35 pub const PROXMOX_BACKUP_INDEX_TASK_FN: &str = taskdir!("/index");
36 pub const PROXMOX_BACKUP_ARCHIVE_TASK_FN: &str = taskdir!("/archive");
37
38 lazy_static! {
39 static ref WORKER_TASK_LIST: Mutex<HashMap<usize, Arc<WorkerTask>>> = Mutex::new(HashMap::new());
40 }
41
42 /// checks if the task UPID refers to a worker from this process
43 fn is_local_worker(upid: &UPID) -> bool {
44 upid.pid == server::pid() && upid.pstart == server::pstart()
45 }
46
47 /// Test if the task is still running
48 pub async fn worker_is_active(upid: &UPID) -> Result<bool, Error> {
49 if is_local_worker(upid) {
50 return Ok(WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id));
51 }
52
53 if procfs::check_process_running_pstart(upid.pid, upid.pstart).is_none() {
54 return Ok(false);
55 }
56
57 let sock = server::ctrl_sock_from_pid(upid.pid);
58 let cmd = json!({
59 "command": "worker-task-status",
60 "args": {
61 "upid": upid.to_string(),
62 },
63 });
64 let status = super::send_command(sock, &cmd).await?;
65
66 if let Some(active) = status.as_bool() {
67 Ok(active)
68 } else {
69 bail!("got unexpected result {:?} (expected bool)", status);
70 }
71 }
72
73 /// Test if the task is still running (fast but inaccurate implementation)
74 ///
75 /// If the task is spawned from a different process, we simply return if
76 /// that process is still running. This information is good enough to detect
77 /// stale tasks...
78 pub fn worker_is_active_local(upid: &UPID) -> bool {
79 if is_local_worker(upid) {
80 WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id)
81 } else {
82 procfs::check_process_running_pstart(upid.pid, upid.pstart).is_some()
83 }
84 }
85
86 pub fn register_task_control_commands(
87 commando_sock: &mut super::CommandoSocket,
88 ) -> Result<(), Error> {
89 fn get_upid(args: Option<&Value>) -> Result<UPID, Error> {
90 let args = if let Some(args) = args { args } else { bail!("missing args") };
91 let upid = match args.get("upid") {
92 Some(Value::String(upid)) => upid.parse::<UPID>()?,
93 None => bail!("no upid in args"),
94 _ => bail!("unable to parse upid"),
95 };
96 if !is_local_worker(&upid) {
97 bail!("upid does not belong to this process");
98 }
99 Ok(upid)
100 }
101
102 commando_sock.register_command("worker-task-abort".into(), move |args| {
103 let upid = get_upid(args)?;
104
105 if let Some(ref worker) = WORKER_TASK_LIST.lock().unwrap().get(&upid.task_id) {
106 worker.request_abort();
107 }
108 Ok(Value::Null)
109 })?;
110 commando_sock.register_command("worker-task-status".into(), move |args| {
111 let upid = get_upid(args)?;
112
113 let active = WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id);
114
115 Ok(active.into())
116 })?;
117
118 Ok(())
119 }
120
121 pub fn abort_worker_async(upid: UPID) {
122 tokio::spawn(async move {
123 if let Err(err) = abort_worker(upid).await {
124 eprintln!("abort worker failed - {}", err);
125 }
126 });
127 }
128
129 pub async fn abort_worker(upid: UPID) -> Result<(), Error> {
130
131 let sock = server::ctrl_sock_from_pid(upid.pid);
132 let cmd = json!({
133 "command": "worker-task-abort",
134 "args": {
135 "upid": upid.to_string(),
136 },
137 });
138 super::send_command(sock, &cmd).map_ok(|_| ()).await
139 }
140
141 fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<TaskState>), Error> {
142
143 let data = line.splitn(3, ' ').collect::<Vec<&str>>();
144
145 let len = data.len();
146
147 match len {
148 1 => Ok((data[0].to_owned(), data[0].parse::<UPID>()?, None)),
149 3 => {
150 let endtime = i64::from_str_radix(data[1], 16)?;
151 let state = TaskState::from_endtime_and_message(endtime, data[2])?;
152 Ok((data[0].to_owned(), data[0].parse::<UPID>()?, Some(state)))
153 }
154 _ => bail!("wrong number of components"),
155 }
156 }
157
158 /// Create task log directory with correct permissions
159 pub fn create_task_log_dirs() -> Result<(), Error> {
160
161 try_block!({
162 let backup_user = crate::backup::backup_user()?;
163 let opts = CreateOptions::new()
164 .owner(backup_user.uid)
165 .group(backup_user.gid);
166
167 create_path(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR, None, Some(opts.clone()))?;
168 create_path(PROXMOX_BACKUP_TASK_DIR, None, Some(opts.clone()))?;
169 create_path(pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR, None, Some(opts))?;
170 Ok(())
171 }).map_err(|err: Error| format_err!("unable to create task log dir - {}", err))?;
172
173 Ok(())
174 }
175
176 /// Read endtime (time of last log line) and exitstatus from task log file
177 /// If there is not a single line with at valid datetime, we assume the
178 /// starttime to be the endtime
179 pub fn upid_read_status(upid: &UPID) -> Result<TaskState, Error> {
180
181 let mut status = TaskState::Unknown { endtime: upid.starttime };
182
183 let path = upid.log_path();
184
185 let mut file = File::open(path)?;
186
187 /// speedup - only read tail
188 use std::io::Seek;
189 use std::io::SeekFrom;
190 let _ = file.seek(SeekFrom::End(-8192)); // ignore errors
191
192 let mut data = Vec::with_capacity(8192);
193 file.read_to_end(&mut data)?;
194
195 // strip newlines at the end of the task logs
196 while data.last() == Some(&b'\n') {
197 data.pop();
198 }
199
200 let last_line = match data.iter().rposition(|c| *c == b'\n') {
201 Some(start) if data.len() > (start+1) => &data[start+1..],
202 Some(_) => &data, // should not happen, since we removed all trailing newlines
203 None => &data,
204 };
205
206 let last_line = std::str::from_utf8(last_line)
207 .map_err(|err| format_err!("upid_read_status: utf8 parse failed: {}", err))?;
208
209 let mut iter = last_line.splitn(2, ": ");
210 if let Some(time_str) = iter.next() {
211 if let Ok(endtime) = proxmox::tools::time::parse_rfc3339(time_str) {
212 // set the endtime even if we cannot parse the state
213 status = TaskState::Unknown { endtime };
214 if let Some(rest) = iter.next().and_then(|rest| rest.strip_prefix("TASK ")) {
215 if let Ok(state) = TaskState::from_endtime_and_message(endtime, rest) {
216 status = state;
217 }
218 }
219 }
220 }
221
222 Ok(status)
223 }
224
225 /// Task State
226 #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
227 pub enum TaskState {
228 /// The Task ended with an undefined state
229 Unknown { endtime: i64 },
230 /// The Task ended and there were no errors or warnings
231 OK { endtime: i64 },
232 /// The Task had 'count' amount of warnings and no errors
233 Warning { count: u64, endtime: i64 },
234 /// The Task ended with the error described in 'message'
235 Error { message: String, endtime: i64 },
236 }
237
238 impl TaskState {
239 pub fn endtime(&self) -> i64 {
240 match *self {
241 TaskState::Unknown { endtime } => endtime,
242 TaskState::OK { endtime } => endtime,
243 TaskState::Warning { endtime, .. } => endtime,
244 TaskState::Error { endtime, .. } => endtime,
245 }
246 }
247
248 pub fn tasktype(&self) -> TaskStateType {
249 match self {
250 TaskState::OK { .. } => TaskStateType::OK,
251 TaskState::Unknown { .. } => TaskStateType::Unknown,
252 TaskState::Error { .. } => TaskStateType::Error,
253 TaskState::Warning { .. } => TaskStateType::Warning,
254 }
255 }
256
257 fn result_text(&self) -> String {
258 match self {
259 TaskState::Error { message, .. } => format!("TASK ERROR: {}", message),
260 other => format!("TASK {}", other),
261 }
262 }
263
264 fn from_endtime_and_message(endtime: i64, s: &str) -> Result<Self, Error> {
265 if s == "unknown" {
266 Ok(TaskState::Unknown { endtime })
267 } else if s == "OK" {
268 Ok(TaskState::OK { endtime })
269 } else if let Some(warnings) = s.strip_prefix("WARNINGS: ") {
270 let count: u64 = warnings.parse()?;
271 Ok(TaskState::Warning{ count, endtime })
272 } else if !s.is_empty() {
273 let message = if let Some(err) = s.strip_prefix("ERROR: ") { err } else { s }.to_string();
274 Ok(TaskState::Error{ message, endtime })
275 } else {
276 bail!("unable to parse Task Status '{}'", s);
277 }
278 }
279 }
280
281 impl std::cmp::PartialOrd for TaskState {
282 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
283 Some(self.endtime().cmp(&other.endtime()))
284 }
285 }
286
287 impl std::cmp::Ord for TaskState {
288 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
289 self.endtime().cmp(&other.endtime())
290 }
291 }
292
293 impl std::fmt::Display for TaskState {
294 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
295 match self {
296 TaskState::Unknown { .. } => write!(f, "unknown"),
297 TaskState::OK { .. }=> write!(f, "OK"),
298 TaskState::Warning { count, .. } => write!(f, "WARNINGS: {}", count),
299 TaskState::Error { message, .. } => write!(f, "{}", message),
300 }
301 }
302 }
303
304 /// Task details including parsed UPID
305 ///
306 /// If there is no `state`, the task is still running.
307 #[derive(Debug)]
308 pub struct TaskListInfo {
309 /// The parsed UPID
310 pub upid: UPID,
311 /// UPID string representation
312 pub upid_str: String,
313 /// Task `(endtime, status)` if already finished
314 pub state: Option<TaskState>, // endtime, status
315 }
316
317 impl Into<pbs_api_types::TaskListItem> for TaskListInfo {
318 fn into(self) -> pbs_api_types::TaskListItem {
319 let (endtime, status) = self
320 .state
321 .map_or_else(|| (None, None), |a| (Some(a.endtime()), Some(a.to_string())));
322
323 pbs_api_types::TaskListItem {
324 upid: self.upid_str,
325 node: "localhost".to_string(),
326 pid: self.upid.pid as i64,
327 pstart: self.upid.pstart,
328 starttime: self.upid.starttime,
329 worker_type: self.upid.worker_type,
330 worker_id: self.upid.worker_id,
331 user: self.upid.auth_id,
332 endtime,
333 status,
334 }
335 }
336 }
337
338 fn lock_task_list_files(exclusive: bool) -> Result<BackupLockGuard, Error> {
339 open_backup_lockfile(PROXMOX_BACKUP_TASK_LOCK_FN, None, exclusive)
340 }
341
342 /// checks if the Task Archive is bigger that 'size_threshold' bytes, and
343 /// rotates it if it is
344 pub fn rotate_task_log_archive(size_threshold: u64, compress: bool, max_files: Option<usize>) -> Result<bool, Error> {
345 let _lock = lock_task_list_files(true)?;
346
347 let mut logrotate = LogRotate::new(PROXMOX_BACKUP_ARCHIVE_TASK_FN, compress)
348 .ok_or_else(|| format_err!("could not get archive file names"))?;
349
350 logrotate.rotate(size_threshold, None, max_files)
351 }
352
353 // atomically read/update the task list, update status of finished tasks
354 // new_upid is added to the list when specified.
355 fn update_active_workers(new_upid: Option<&UPID>) -> Result<(), Error> {
356
357 let backup_user = crate::backup::backup_user()?;
358
359 let lock = lock_task_list_files(true)?;
360
361 // TODO remove with 1.x
362 let mut finish_list: Vec<TaskListInfo> = read_task_file_from_path(PROXMOX_BACKUP_INDEX_TASK_FN)?;
363 let had_index_file = !finish_list.is_empty();
364
365 // We use filter_map because one negative case wants to *move* the data into `finish_list`,
366 // clippy doesn't quite catch this!
367 #[allow(clippy::unnecessary_filter_map)]
368 let mut active_list: Vec<TaskListInfo> = read_task_file_from_path(PROXMOX_BACKUP_ACTIVE_TASK_FN)?
369 .into_iter()
370 .filter_map(|info| {
371 if info.state.is_some() {
372 // this can happen when the active file still includes finished tasks
373 finish_list.push(info);
374 return None;
375 }
376
377 if !worker_is_active_local(&info.upid) {
378 // println!("Detected stopped task '{}'", &info.upid_str);
379 let now = proxmox::tools::time::epoch_i64();
380 let status = upid_read_status(&info.upid).unwrap_or(TaskState::Unknown { endtime: now });
381 finish_list.push(TaskListInfo {
382 upid: info.upid,
383 upid_str: info.upid_str,
384 state: Some(status)
385 });
386 return None;
387 }
388
389 Some(info)
390 }).collect();
391
392 if let Some(upid) = new_upid {
393 active_list.push(TaskListInfo { upid: upid.clone(), upid_str: upid.to_string(), state: None });
394 }
395
396 let active_raw = render_task_list(&active_list);
397
398 replace_file(
399 PROXMOX_BACKUP_ACTIVE_TASK_FN,
400 active_raw.as_bytes(),
401 CreateOptions::new()
402 .owner(backup_user.uid)
403 .group(backup_user.gid),
404 )?;
405
406 finish_list.sort_unstable_by(|a, b| {
407 match (&a.state, &b.state) {
408 (Some(s1), Some(s2)) => s1.cmp(&s2),
409 (Some(_), None) => std::cmp::Ordering::Less,
410 (None, Some(_)) => std::cmp::Ordering::Greater,
411 _ => a.upid.starttime.cmp(&b.upid.starttime),
412 }
413 });
414
415 if !finish_list.is_empty() {
416 match std::fs::OpenOptions::new().append(true).create(true).open(PROXMOX_BACKUP_ARCHIVE_TASK_FN) {
417 Ok(mut writer) => {
418 for info in &finish_list {
419 writer.write_all(render_task_line(&info).as_bytes())?;
420 }
421 },
422 Err(err) => bail!("could not write task archive - {}", err),
423 }
424
425 nix::unistd::chown(PROXMOX_BACKUP_ARCHIVE_TASK_FN, Some(backup_user.uid), Some(backup_user.gid))?;
426 }
427
428 // TODO Remove with 1.x
429 // for compatibility, if we had an INDEX file, we do not need it anymore
430 if had_index_file {
431 let _ = nix::unistd::unlink(PROXMOX_BACKUP_INDEX_TASK_FN);
432 }
433
434 drop(lock);
435
436 Ok(())
437 }
438
439 fn render_task_line(info: &TaskListInfo) -> String {
440 let mut raw = String::new();
441 if let Some(status) = &info.state {
442 raw.push_str(&format!("{} {:08X} {}\n", info.upid_str, status.endtime(), status));
443 } else {
444 raw.push_str(&info.upid_str);
445 raw.push('\n');
446 }
447
448 raw
449 }
450
451 fn render_task_list(list: &[TaskListInfo]) -> String {
452 let mut raw = String::new();
453 for info in list {
454 raw.push_str(&render_task_line(&info));
455 }
456 raw
457 }
458
459 // note this is not locked, caller has to make sure it is
460 // this will skip (and log) lines that are not valid status lines
461 fn read_task_file<R: Read>(reader: R) -> Result<Vec<TaskListInfo>, Error>
462 {
463 let reader = BufReader::new(reader);
464 let mut list = Vec::new();
465 for line in reader.lines() {
466 let line = line?;
467 match parse_worker_status_line(&line) {
468 Ok((upid_str, upid, state)) => list.push(TaskListInfo {
469 upid_str,
470 upid,
471 state
472 }),
473 Err(err) => {
474 eprintln!("unable to parse worker status '{}' - {}", line, err);
475 continue;
476 }
477 };
478 }
479
480 Ok(list)
481 }
482
483 // note this is not locked, caller has to make sure it is
484 fn read_task_file_from_path<P>(path: P) -> Result<Vec<TaskListInfo>, Error>
485 where
486 P: AsRef<std::path::Path> + std::fmt::Debug,
487 {
488 let file = match File::open(&path) {
489 Ok(f) => f,
490 Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
491 Err(err) => bail!("unable to open task list {:?} - {}", path, err),
492 };
493
494 read_task_file(file)
495 }
496
497 pub struct TaskListInfoIterator {
498 list: VecDeque<TaskListInfo>,
499 end: bool,
500 archive: Option<LogRotateFiles>,
501 lock: Option<BackupLockGuard>,
502 }
503
504 impl TaskListInfoIterator {
505 pub fn new(active_only: bool) -> Result<Self, Error> {
506 let (read_lock, active_list) = {
507 let lock = lock_task_list_files(false)?;
508 let active_list = read_task_file_from_path(PROXMOX_BACKUP_ACTIVE_TASK_FN)?;
509
510 let needs_update = active_list
511 .iter()
512 .any(|info| info.state.is_some() || !worker_is_active_local(&info.upid));
513
514 // TODO remove with 1.x
515 let index_exists = std::path::Path::new(PROXMOX_BACKUP_INDEX_TASK_FN).is_file();
516
517 if needs_update || index_exists {
518 drop(lock);
519 update_active_workers(None)?;
520 let lock = lock_task_list_files(false)?;
521 let active_list = read_task_file_from_path(PROXMOX_BACKUP_ACTIVE_TASK_FN)?;
522 (lock, active_list)
523 } else {
524 (lock, active_list)
525 }
526 };
527
528 let archive = if active_only {
529 None
530 } else {
531 let logrotate = LogRotate::new(PROXMOX_BACKUP_ARCHIVE_TASK_FN, true)
532 .ok_or_else(|| format_err!("could not get archive file names"))?;
533 Some(logrotate.files())
534 };
535
536 let lock = if active_only { None } else { Some(read_lock) };
537
538 Ok(Self {
539 list: active_list.into(),
540 end: active_only,
541 archive,
542 lock,
543 })
544 }
545 }
546
547 impl Iterator for TaskListInfoIterator {
548 type Item = Result<TaskListInfo, Error>;
549
550 fn next(&mut self) -> Option<Self::Item> {
551 loop {
552 if let Some(element) = self.list.pop_back() {
553 return Some(Ok(element));
554 } else if self.end {
555 return None;
556 } else {
557 if let Some(mut archive) = self.archive.take() {
558 if let Some(file) = archive.next() {
559 let list = match read_task_file(file) {
560 Ok(list) => list,
561 Err(err) => return Some(Err(err)),
562 };
563 self.list.append(&mut list.into());
564 self.archive = Some(archive);
565 continue;
566 }
567 }
568
569 self.end = true;
570 self.lock.take();
571 }
572 }
573 }
574 }
575
576 /// Launch long running worker tasks.
577 ///
578 /// A worker task can either be a whole thread, or a simply tokio
579 /// task/future. Each task can `log()` messages, which are stored
580 /// persistently to files. Task should poll the `abort_requested`
581 /// flag, and stop execution when requested.
582 #[derive(Debug)]
583 pub struct WorkerTask {
584 upid: UPID,
585 data: Mutex<WorkerTaskData>,
586 abort_requested: AtomicBool,
587 }
588
589 impl std::fmt::Display for WorkerTask {
590
591 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
592 self.upid.fmt(f)
593 }
594 }
595
596 #[derive(Debug)]
597 struct WorkerTaskData {
598 logger: FileLogger,
599 progress: f64, // 0..1
600 warn_count: u64,
601 pub abort_listeners: Vec<oneshot::Sender<()>>,
602 }
603
604 impl WorkerTask {
605
606 pub fn new(worker_type: &str, worker_id: Option<String>, auth_id: Authid, to_stdout: bool) -> Result<Arc<Self>, Error> {
607 let upid = UPID::new(worker_type, worker_id, auth_id)?;
608 let task_id = upid.task_id;
609
610 let mut path = std::path::PathBuf::from(PROXMOX_BACKUP_TASK_DIR);
611
612 path.push(format!("{:02X}", upid.pstart & 255));
613
614 let backup_user = crate::backup::backup_user()?;
615
616 create_path(&path, None, Some(CreateOptions::new().owner(backup_user.uid).group(backup_user.gid)))?;
617
618 path.push(upid.to_string());
619
620 let logger_options = FileLogOptions {
621 to_stdout,
622 exclusive: true,
623 prefix_time: true,
624 read: true,
625 ..Default::default()
626 };
627 let logger = FileLogger::new(&path, logger_options)?;
628 nix::unistd::chown(&path, Some(backup_user.uid), Some(backup_user.gid))?;
629
630 let worker = Arc::new(Self {
631 upid: upid.clone(),
632 abort_requested: AtomicBool::new(false),
633 data: Mutex::new(WorkerTaskData {
634 logger,
635 progress: 0.0,
636 warn_count: 0,
637 abort_listeners: vec![],
638 }),
639 });
640
641 // scope to drop the lock again after inserting
642 {
643 let mut hash = WORKER_TASK_LIST.lock().unwrap();
644 hash.insert(task_id, worker.clone());
645 super::set_worker_count(hash.len());
646 }
647
648 update_active_workers(Some(&upid))?;
649
650 Ok(worker)
651 }
652
653 /// Spawn a new tokio task/future.
654 pub fn spawn<F, T>(
655 worker_type: &str,
656 worker_id: Option<String>,
657 auth_id: Authid,
658 to_stdout: bool,
659 f: F,
660 ) -> Result<String, Error>
661 where F: Send + 'static + FnOnce(Arc<WorkerTask>) -> T,
662 T: Send + 'static + Future<Output = Result<(), Error>>,
663 {
664 let worker = WorkerTask::new(worker_type, worker_id, auth_id, to_stdout)?;
665 let upid_str = worker.upid.to_string();
666 let f = f(worker.clone());
667 tokio::spawn(async move {
668 let result = f.await;
669 worker.log_result(&result);
670 });
671
672 Ok(upid_str)
673 }
674
675 /// Create a new worker thread.
676 pub fn new_thread<F>(
677 worker_type: &str,
678 worker_id: Option<String>,
679 auth_id: Authid,
680 to_stdout: bool,
681 f: F,
682 ) -> Result<String, Error>
683 where F: Send + UnwindSafe + 'static + FnOnce(Arc<WorkerTask>) -> Result<(), Error>
684 {
685 let worker = WorkerTask::new(worker_type, worker_id, auth_id, to_stdout)?;
686 let upid_str = worker.upid.to_string();
687
688 let _child = std::thread::Builder::new().name(upid_str.clone()).spawn(move || {
689 let worker1 = worker.clone();
690 let result = match std::panic::catch_unwind(move || f(worker1)) {
691 Ok(r) => r,
692 Err(panic) => {
693 match panic.downcast::<&str>() {
694 Ok(panic_msg) => {
695 Err(format_err!("worker panicked: {}", panic_msg))
696 }
697 Err(_) => {
698 Err(format_err!("worker panicked: unknown type."))
699 }
700 }
701 }
702 };
703
704 worker.log_result(&result);
705 });
706
707 Ok(upid_str)
708 }
709
710 /// create state from self and a result
711 pub fn create_state(&self, result: &Result<(), Error>) -> TaskState {
712 let warn_count = self.data.lock().unwrap().warn_count;
713
714 let endtime = proxmox::tools::time::epoch_i64();
715
716 if let Err(err) = result {
717 TaskState::Error { message: err.to_string(), endtime }
718 } else if warn_count > 0 {
719 TaskState::Warning { count: warn_count, endtime }
720 } else {
721 TaskState::OK { endtime }
722 }
723 }
724
725 /// Log task result, remove task from running list
726 pub fn log_result(&self, result: &Result<(), Error>) {
727 let state = self.create_state(result);
728 self.log(state.result_text());
729
730 WORKER_TASK_LIST.lock().unwrap().remove(&self.upid.task_id);
731 let _ = update_active_workers(None);
732 super::set_worker_count(WORKER_TASK_LIST.lock().unwrap().len());
733 }
734
735 /// Log a message.
736 pub fn log<S: AsRef<str>>(&self, msg: S) {
737 let mut data = self.data.lock().unwrap();
738 data.logger.log(msg);
739 }
740
741 /// Log a message as warning.
742 pub fn warn<S: AsRef<str>>(&self, msg: S) {
743 let mut data = self.data.lock().unwrap();
744 data.logger.log(format!("WARN: {}", msg.as_ref()));
745 data.warn_count += 1;
746 }
747
748 /// Set progress indicator
749 pub fn progress(&self, progress: f64) {
750 if progress >= 0.0 && progress <= 1.0 {
751 let mut data = self.data.lock().unwrap();
752 data.progress = progress;
753 } else {
754 // fixme: log!("task '{}': ignoring strange value for progress '{}'", self.upid, progress);
755 }
756 }
757
758 /// Request abort
759 pub fn request_abort(&self) {
760 eprintln!("set abort flag for worker {}", self.upid);
761
762 let prev_abort = self.abort_requested.swap(true, Ordering::SeqCst);
763 if !prev_abort { // log abort one time
764 self.log(format!("received abort request ..."));
765 }
766 // noitify listeners
767 let mut data = self.data.lock().unwrap();
768 loop {
769 match data.abort_listeners.pop() {
770 None => { break; },
771 Some(ch) => {
772 let _ = ch.send(()); // ignore errors here
773 },
774 }
775 }
776 }
777
778 /// Test if abort was requested.
779 pub fn abort_requested(&self) -> bool {
780 self.abort_requested.load(Ordering::SeqCst)
781 }
782
783 /// Fail if abort was requested.
784 pub fn fail_on_abort(&self) -> Result<(), Error> {
785 if self.abort_requested() {
786 bail!("abort requested - aborting task");
787 }
788 Ok(())
789 }
790
791 /// Get a future which resolves on task abort
792 pub fn abort_future(&self) -> oneshot::Receiver<()> {
793 let (tx, rx) = oneshot::channel::<()>();
794
795 let mut data = self.data.lock().unwrap();
796 if self.abort_requested() {
797 let _ = tx.send(());
798 } else {
799 data.abort_listeners.push(tx);
800 }
801 rx
802 }
803
804 pub fn upid(&self) -> &UPID {
805 &self.upid
806 }
807 }
808
809 impl pbs_datastore::task::TaskState for WorkerTask {
810 fn check_abort(&self) -> Result<(), Error> {
811 self.fail_on_abort()
812 }
813
814 fn log(&self, level: log::Level, message: &std::fmt::Arguments) {
815 match level {
816 log::Level::Error => self.warn(&message.to_string()),
817 log::Level::Warn => self.warn(&message.to_string()),
818 log::Level::Info => self.log(&message.to_string()),
819 log::Level::Debug => self.log(&format!("DEBUG: {}", message)),
820 log::Level::Trace => self.log(&format!("TRACE: {}", message)),
821 }
822 }
823 }
824
825 /// Wait for a locally spanned worker task
826 ///
827 /// Note: local workers should print logs to stdout, so there is no
828 /// need to fetch/display logs. We just wait for the worker to finish.
829 pub async fn wait_for_local_worker(upid_str: &str) -> Result<(), Error> {
830
831 let upid: UPID = upid_str.parse()?;
832
833 let sleep_duration = core::time::Duration::new(0, 100_000_000);
834
835 loop {
836 if worker_is_active_local(&upid) {
837 tokio::time::sleep(sleep_duration).await;
838 } else {
839 break;
840 }
841 }
842 Ok(())
843 }