]> git.proxmox.com Git - proxmox-backup.git/blob - proxmox-rest-server/src/worker_task.rs
use new proxmox-sys crate
[proxmox-backup.git] / proxmox-rest-server / src / worker_task.rs
1 use std::collections::{HashMap, VecDeque};
2 use std::fs::File;
3 use std::path::PathBuf;
4 use std::io::{Read, Write, BufRead, BufReader};
5 use std::panic::UnwindSafe;
6 use std::sync::atomic::{AtomicBool, Ordering};
7 use std::sync::{Arc, Mutex};
8 use std::time::{SystemTime, Duration};
9
10 use anyhow::{bail, format_err, Error};
11 use futures::*;
12 use lazy_static::lazy_static;
13 use serde_json::{json, Value};
14 use serde::{Serialize, Deserialize};
15 use tokio::sync::oneshot;
16 use nix::fcntl::OFlag;
17 use once_cell::sync::OnceCell;
18
19 use proxmox::sys::linux::procfs;
20 use proxmox::tools::fs::{create_path, replace_file, atomic_open_or_create_file, CreateOptions};
21 use proxmox_lang::try_block;
22 use proxmox_schema::upid::UPID;
23
24 use proxmox_sys::worker_task_context::{WorkerTaskContext};
25 use proxmox_sys::logrotate::{LogRotate, LogRotateFiles};
26
27 use crate::{CommandSocket, FileLogger, FileLogOptions};
28
29 struct TaskListLockGuard(File);
30
31 struct WorkerTaskSetup {
32 file_opts: CreateOptions,
33 taskdir: PathBuf,
34 task_lock_fn: PathBuf,
35 active_tasks_fn: PathBuf,
36 task_index_fn: PathBuf,
37 task_archive_fn: PathBuf,
38 }
39
40 static WORKER_TASK_SETUP: OnceCell<WorkerTaskSetup> = OnceCell::new();
41
42 fn worker_task_setup() -> Result<&'static WorkerTaskSetup, Error> {
43 WORKER_TASK_SETUP.get()
44 .ok_or_else(|| format_err!("WorkerTask library is not initialized"))
45 }
46
47 impl WorkerTaskSetup {
48
49 fn new(basedir: PathBuf, file_opts: CreateOptions) -> Self {
50
51 let mut taskdir = basedir.clone();
52 taskdir.push("tasks");
53
54 let mut task_lock_fn = taskdir.clone();
55 task_lock_fn.push(".active.lock");
56
57 let mut active_tasks_fn = taskdir.clone();
58 active_tasks_fn.push("active");
59
60 let mut task_index_fn = taskdir.clone();
61 task_index_fn.push("index");
62
63 let mut task_archive_fn = taskdir.clone();
64 task_archive_fn.push("archive");
65
66 Self {
67 file_opts,
68 taskdir,
69 task_lock_fn,
70 active_tasks_fn,
71 task_index_fn,
72 task_archive_fn,
73 }
74 }
75
76 fn lock_task_list_files(&self, exclusive: bool) -> Result<TaskListLockGuard, Error> {
77 let options = self.file_opts.clone()
78 .perm(nix::sys::stat::Mode::from_bits_truncate(0o660));
79
80 let timeout = std::time::Duration::new(10, 0);
81
82 let file = proxmox::tools::fs::open_file_locked(
83 &self.task_lock_fn,
84 timeout,
85 exclusive,
86 options,
87 )?;
88
89 Ok(TaskListLockGuard(file))
90 }
91
92 fn log_path(&self, upid: &UPID) -> std::path::PathBuf {
93 let mut path = self.taskdir.clone();
94 path.push(format!("{:02X}", upid.pstart % 256));
95 path.push(upid.to_string());
96 path
97 }
98
99 // atomically read/update the task list, update status of finished tasks
100 // new_upid is added to the list when specified.
101 fn update_active_workers(&self, new_upid: Option<&UPID>) -> Result<(), Error> {
102
103 let lock = self.lock_task_list_files(true)?;
104
105 // TODO remove with 1.x
106 let mut finish_list: Vec<TaskListInfo> = read_task_file_from_path(&self.task_index_fn)?;
107 let had_index_file = !finish_list.is_empty();
108
109 // We use filter_map because one negative case wants to *move* the data into `finish_list`,
110 // clippy doesn't quite catch this!
111 #[allow(clippy::unnecessary_filter_map)]
112 let mut active_list: Vec<TaskListInfo> = read_task_file_from_path(&self.active_tasks_fn)?
113 .into_iter()
114 .filter_map(|info| {
115 if info.state.is_some() {
116 // this can happen when the active file still includes finished tasks
117 finish_list.push(info);
118 return None;
119 }
120
121 if !worker_is_active_local(&info.upid) {
122 // println!("Detected stopped task '{}'", &info.upid_str);
123 let now = proxmox_time::epoch_i64();
124 let status = upid_read_status(&info.upid).unwrap_or(TaskState::Unknown { endtime: now });
125 finish_list.push(TaskListInfo {
126 upid: info.upid,
127 upid_str: info.upid_str,
128 state: Some(status)
129 });
130 return None;
131 }
132
133 Some(info)
134 }).collect();
135
136 if let Some(upid) = new_upid {
137 active_list.push(TaskListInfo { upid: upid.clone(), upid_str: upid.to_string(), state: None });
138 }
139
140 let active_raw = render_task_list(&active_list);
141
142 let options = self.file_opts.clone()
143 .perm(nix::sys::stat::Mode::from_bits_truncate(0o660));
144
145 replace_file(
146 &self.active_tasks_fn,
147 active_raw.as_bytes(),
148 options,
149 false,
150 )?;
151
152 finish_list.sort_unstable_by(|a, b| {
153 match (&a.state, &b.state) {
154 (Some(s1), Some(s2)) => s1.cmp(&s2),
155 (Some(_), None) => std::cmp::Ordering::Less,
156 (None, Some(_)) => std::cmp::Ordering::Greater,
157 _ => a.upid.starttime.cmp(&b.upid.starttime),
158 }
159 });
160
161 if !finish_list.is_empty() {
162 let options = self.file_opts.clone()
163 .perm(nix::sys::stat::Mode::from_bits_truncate(0o660));
164
165 let mut writer = atomic_open_or_create_file(
166 &self.task_archive_fn,
167 OFlag::O_APPEND | OFlag::O_RDWR,
168 &[],
169 options,
170 false,
171 )?;
172 for info in &finish_list {
173 writer.write_all(render_task_line(&info).as_bytes())?;
174 }
175 }
176
177 // TODO Remove with 1.x
178 // for compatibility, if we had an INDEX file, we do not need it anymore
179 if had_index_file {
180 let _ = nix::unistd::unlink(&self.task_index_fn);
181 }
182
183 drop(lock);
184
185 Ok(())
186 }
187
188 // Create task log directory with correct permissions
189 fn create_task_log_dirs(&self) -> Result<(), Error> {
190
191 try_block!({
192 let dir_opts = self.file_opts.clone()
193 .perm(nix::sys::stat::Mode::from_bits_truncate(0o755));
194
195 create_path(&self.taskdir, Some(dir_opts.clone()), Some(dir_opts.clone()))?;
196 // fixme:??? create_path(pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR, None, Some(opts))?;
197 Ok(())
198 }).map_err(|err: Error| format_err!("unable to create task log dir - {}", err))
199 }
200 }
201
202 /// Initialize the WorkerTask library
203 pub fn init_worker_tasks(basedir: PathBuf, file_opts: CreateOptions) -> Result<(), Error> {
204 let setup = WorkerTaskSetup::new(basedir, file_opts);
205 setup.create_task_log_dirs()?;
206 WORKER_TASK_SETUP.set(setup)
207 .map_err(|_| format_err!("init_worker_tasks failed - already initialized"))
208 }
209
210 /// checks if the Task Archive is bigger that 'size_threshold' bytes, and
211 /// rotates it if it is
212 pub fn rotate_task_log_archive(
213 size_threshold: u64,
214 compress: bool,
215 max_files: Option<usize>,
216 options: Option<CreateOptions>,
217 ) -> Result<bool, Error> {
218
219 let setup = worker_task_setup()?;
220
221 let _lock = setup.lock_task_list_files(true)?;
222
223 let mut logrotate = LogRotate::new(
224 &setup.task_archive_fn,
225 compress,
226 max_files,
227 options,
228 )?;
229
230 logrotate.rotate(size_threshold)
231 }
232
233 /// removes all task logs that are older than the oldest task entry in the
234 /// task archive
235 pub fn cleanup_old_tasks(compressed: bool) -> Result<(), Error> {
236 let setup = worker_task_setup()?;
237
238 let _lock = setup.lock_task_list_files(true)?;
239
240 let logrotate = LogRotate::new(
241 &setup.task_archive_fn,
242 compressed,
243 None,
244 None,
245 )?;
246
247 let mut timestamp = None;
248 if let Some(last_file) = logrotate.files().last() {
249 let reader = BufReader::new(last_file);
250 for line in reader.lines() {
251 let line = line?;
252 if let Ok((_, _, Some(state))) = parse_worker_status_line(&line) {
253 timestamp = Some(state.endtime());
254 break;
255 }
256 }
257 }
258
259 fn get_modified(entry: std::fs::DirEntry) -> Result<SystemTime, std::io::Error> {
260 entry.metadata()?.modified()
261 }
262
263 if let Some(timestamp) = timestamp {
264 let cutoff_time = if timestamp > 0 {
265 SystemTime::UNIX_EPOCH.checked_add(Duration::from_secs(timestamp as u64))
266 } else {
267 SystemTime::UNIX_EPOCH.checked_sub(Duration::from_secs(-timestamp as u64))
268 }.ok_or_else(|| format_err!("could not calculate cutoff time"))?;
269
270 for i in 0..256 {
271 let mut path = setup.taskdir.clone();
272 path.push(format!("{:02X}", i));
273 for file in std::fs::read_dir(path)? {
274 let file = file?;
275 let path = file.path();
276
277 let modified = get_modified(file)
278 .map_err(|err| format_err!("error getting mtime for {:?}: {}", path, err))?;
279
280 if modified < cutoff_time {
281 match std::fs::remove_file(path) {
282 Ok(()) => {},
283 Err(err) if err.kind() == std::io::ErrorKind::NotFound => {},
284 Err(err) => bail!("could not remove file: {}", err),
285 }
286 }
287 }
288 }
289 }
290
291 Ok(())
292 }
293
294
295 /// Path to the worker log file
296 pub fn upid_log_path(upid: &UPID) -> Result<std::path::PathBuf, Error> {
297 let setup = worker_task_setup()?;
298 Ok(setup.log_path(upid))
299 }
300
301 /// Read endtime (time of last log line) and exitstatus from task log file
302 /// If there is not a single line with at valid datetime, we assume the
303 /// starttime to be the endtime
304 pub fn upid_read_status(upid: &UPID) -> Result<TaskState, Error> {
305
306 let setup = worker_task_setup()?;
307
308 let mut status = TaskState::Unknown { endtime: upid.starttime };
309
310 let path = setup.log_path(upid);
311
312 let mut file = File::open(path)?;
313
314 /// speedup - only read tail
315 use std::io::Seek;
316 use std::io::SeekFrom;
317 let _ = file.seek(SeekFrom::End(-8192)); // ignore errors
318
319 let mut data = Vec::with_capacity(8192);
320 file.read_to_end(&mut data)?;
321
322 // strip newlines at the end of the task logs
323 while data.last() == Some(&b'\n') {
324 data.pop();
325 }
326
327 let last_line = match data.iter().rposition(|c| *c == b'\n') {
328 Some(start) if data.len() > (start+1) => &data[start+1..],
329 Some(_) => &data, // should not happen, since we removed all trailing newlines
330 None => &data,
331 };
332
333 let last_line = std::str::from_utf8(last_line)
334 .map_err(|err| format_err!("upid_read_status: utf8 parse failed: {}", err))?;
335
336 let mut iter = last_line.splitn(2, ": ");
337 if let Some(time_str) = iter.next() {
338 if let Ok(endtime) = proxmox_time::parse_rfc3339(time_str) {
339 // set the endtime even if we cannot parse the state
340 status = TaskState::Unknown { endtime };
341 if let Some(rest) = iter.next().and_then(|rest| rest.strip_prefix("TASK ")) {
342 if let Ok(state) = TaskState::from_endtime_and_message(endtime, rest) {
343 status = state;
344 }
345 }
346 }
347 }
348
349 Ok(status)
350 }
351
352 lazy_static! {
353 static ref WORKER_TASK_LIST: Mutex<HashMap<usize, Arc<WorkerTask>>> = Mutex::new(HashMap::new());
354 }
355
356 /// checks if the task UPID refers to a worker from this process
357 fn is_local_worker(upid: &UPID) -> bool {
358 upid.pid == crate::pid() && upid.pstart == crate::pstart()
359 }
360
361 /// Test if the task is still running
362 pub async fn worker_is_active(upid: &UPID) -> Result<bool, Error> {
363 if is_local_worker(upid) {
364 return Ok(WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id));
365 }
366
367 if procfs::check_process_running_pstart(upid.pid, upid.pstart).is_none() {
368 return Ok(false);
369 }
370
371 let sock = crate::ctrl_sock_from_pid(upid.pid);
372 let cmd = json!({
373 "command": "worker-task-status",
374 "args": {
375 "upid": upid.to_string(),
376 },
377 });
378 let status = crate::send_command(sock, &cmd).await?;
379
380 if let Some(active) = status.as_bool() {
381 Ok(active)
382 } else {
383 bail!("got unexpected result {:?} (expected bool)", status);
384 }
385 }
386
387 /// Test if the task is still running (fast but inaccurate implementation)
388 ///
389 /// If the task is spawned from a different process, we simply return if
390 /// that process is still running. This information is good enough to detect
391 /// stale tasks...
392 pub fn worker_is_active_local(upid: &UPID) -> bool {
393 if is_local_worker(upid) {
394 WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id)
395 } else {
396 procfs::check_process_running_pstart(upid.pid, upid.pstart).is_some()
397 }
398 }
399
400 /// Register task control command on a [CommandSocket].
401 ///
402 /// This create two commands:
403 ///
404 /// * ``worker-task-abort <UPID>``: calls [abort_local_worker]
405 ///
406 /// * ``worker-task-status <UPID>``: return true of false, depending on
407 /// whether the worker is running or stopped.
408 pub fn register_task_control_commands(
409 commando_sock: &mut CommandSocket,
410 ) -> Result<(), Error> {
411 fn get_upid(args: Option<&Value>) -> Result<UPID, Error> {
412 let args = if let Some(args) = args { args } else { bail!("missing args") };
413 let upid = match args.get("upid") {
414 Some(Value::String(upid)) => upid.parse::<UPID>()?,
415 None => bail!("no upid in args"),
416 _ => bail!("unable to parse upid"),
417 };
418 if !is_local_worker(&upid) {
419 bail!("upid does not belong to this process");
420 }
421 Ok(upid)
422 }
423
424 commando_sock.register_command("worker-task-abort".into(), move |args| {
425 let upid = get_upid(args)?;
426
427 abort_local_worker(upid);
428
429 Ok(Value::Null)
430 })?;
431 commando_sock.register_command("worker-task-status".into(), move |args| {
432 let upid = get_upid(args)?;
433
434 let active = WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id);
435
436 Ok(active.into())
437 })?;
438
439 Ok(())
440 }
441
442 /// Try to abort a worker task, but do no wait
443 ///
444 /// Errors (if any) are simply logged.
445 pub fn abort_worker_nowait(upid: UPID) {
446 tokio::spawn(async move {
447 if let Err(err) = abort_worker(upid).await {
448 log::error!("abort worker task failed - {}", err);
449 }
450 });
451 }
452
453 /// Abort a worker task
454 ///
455 /// By sending ``worker-task-abort`` to the control socket.
456 pub async fn abort_worker(upid: UPID) -> Result<(), Error> {
457
458 let sock = crate::ctrl_sock_from_pid(upid.pid);
459 let cmd = json!({
460 "command": "worker-task-abort",
461 "args": {
462 "upid": upid.to_string(),
463 },
464 });
465 crate::send_command(sock, &cmd).map_ok(|_| ()).await
466 }
467
468 fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<TaskState>), Error> {
469
470 let data = line.splitn(3, ' ').collect::<Vec<&str>>();
471
472 let len = data.len();
473
474 match len {
475 1 => Ok((data[0].to_owned(), data[0].parse::<UPID>()?, None)),
476 3 => {
477 let endtime = i64::from_str_radix(data[1], 16)?;
478 let state = TaskState::from_endtime_and_message(endtime, data[2])?;
479 Ok((data[0].to_owned(), data[0].parse::<UPID>()?, Some(state)))
480 }
481 _ => bail!("wrong number of components"),
482 }
483 }
484
485 /// Task State
486 #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
487 pub enum TaskState {
488 /// The Task ended with an undefined state
489 Unknown { endtime: i64 },
490 /// The Task ended and there were no errors or warnings
491 OK { endtime: i64 },
492 /// The Task had 'count' amount of warnings and no errors
493 Warning { count: u64, endtime: i64 },
494 /// The Task ended with the error described in 'message'
495 Error { message: String, endtime: i64 },
496 }
497
498 impl TaskState {
499 pub fn endtime(&self) -> i64 {
500 match *self {
501 TaskState::Unknown { endtime } => endtime,
502 TaskState::OK { endtime } => endtime,
503 TaskState::Warning { endtime, .. } => endtime,
504 TaskState::Error { endtime, .. } => endtime,
505 }
506 }
507
508 fn result_text(&self) -> String {
509 match self {
510 TaskState::Error { message, .. } => format!("TASK ERROR: {}", message),
511 other => format!("TASK {}", other),
512 }
513 }
514
515 fn from_endtime_and_message(endtime: i64, s: &str) -> Result<Self, Error> {
516 if s == "unknown" {
517 Ok(TaskState::Unknown { endtime })
518 } else if s == "OK" {
519 Ok(TaskState::OK { endtime })
520 } else if let Some(warnings) = s.strip_prefix("WARNINGS: ") {
521 let count: u64 = warnings.parse()?;
522 Ok(TaskState::Warning{ count, endtime })
523 } else if !s.is_empty() {
524 let message = if let Some(err) = s.strip_prefix("ERROR: ") { err } else { s }.to_string();
525 Ok(TaskState::Error{ message, endtime })
526 } else {
527 bail!("unable to parse Task Status '{}'", s);
528 }
529 }
530 }
531
532 impl std::cmp::PartialOrd for TaskState {
533 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
534 Some(self.endtime().cmp(&other.endtime()))
535 }
536 }
537
538 impl std::cmp::Ord for TaskState {
539 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
540 self.endtime().cmp(&other.endtime())
541 }
542 }
543
544 impl std::fmt::Display for TaskState {
545 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
546 match self {
547 TaskState::Unknown { .. } => write!(f, "unknown"),
548 TaskState::OK { .. }=> write!(f, "OK"),
549 TaskState::Warning { count, .. } => write!(f, "WARNINGS: {}", count),
550 TaskState::Error { message, .. } => write!(f, "{}", message),
551 }
552 }
553 }
554
555 /// Task details including parsed UPID
556 ///
557 /// If there is no `state`, the task is still running.
558 #[derive(Debug)]
559 pub struct TaskListInfo {
560 /// The parsed UPID
561 pub upid: UPID,
562 /// UPID string representation
563 pub upid_str: String,
564 /// Task `(endtime, status)` if already finished
565 pub state: Option<TaskState>, // endtime, status
566 }
567
568 fn render_task_line(info: &TaskListInfo) -> String {
569 let mut raw = String::new();
570 if let Some(status) = &info.state {
571 raw.push_str(&format!("{} {:08X} {}\n", info.upid_str, status.endtime(), status));
572 } else {
573 raw.push_str(&info.upid_str);
574 raw.push('\n');
575 }
576
577 raw
578 }
579
580 fn render_task_list(list: &[TaskListInfo]) -> String {
581 let mut raw = String::new();
582 for info in list {
583 raw.push_str(&render_task_line(&info));
584 }
585 raw
586 }
587
588 // note this is not locked, caller has to make sure it is
589 // this will skip (and log) lines that are not valid status lines
590 fn read_task_file<R: Read>(reader: R) -> Result<Vec<TaskListInfo>, Error>
591 {
592 let reader = BufReader::new(reader);
593 let mut list = Vec::new();
594 for line in reader.lines() {
595 let line = line?;
596 match parse_worker_status_line(&line) {
597 Ok((upid_str, upid, state)) => list.push(TaskListInfo {
598 upid_str,
599 upid,
600 state
601 }),
602 Err(err) => {
603 log::warn!("unable to parse worker status '{}' - {}", line, err);
604 continue;
605 }
606 };
607 }
608
609 Ok(list)
610 }
611
612 // note this is not locked, caller has to make sure it is
613 fn read_task_file_from_path<P>(path: P) -> Result<Vec<TaskListInfo>, Error>
614 where
615 P: AsRef<std::path::Path> + std::fmt::Debug,
616 {
617 let file = match File::open(&path) {
618 Ok(f) => f,
619 Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
620 Err(err) => bail!("unable to open task list {:?} - {}", path, err),
621 };
622
623 read_task_file(file)
624 }
625
626 /// Iterate over existing/active worker tasks
627 pub struct TaskListInfoIterator {
628 list: VecDeque<TaskListInfo>,
629 end: bool,
630 archive: Option<LogRotateFiles>,
631 lock: Option<TaskListLockGuard>,
632 }
633
634 impl TaskListInfoIterator {
635 /// Creates a new iterator instance.
636 pub fn new(active_only: bool) -> Result<Self, Error> {
637
638 let setup = worker_task_setup()?;
639
640 let (read_lock, active_list) = {
641 let lock = setup.lock_task_list_files(false)?;
642 let active_list = read_task_file_from_path(&setup.active_tasks_fn)?;
643
644 let needs_update = active_list
645 .iter()
646 .any(|info| info.state.is_some() || !worker_is_active_local(&info.upid));
647
648 // TODO remove with 1.x
649 let index_exists = setup.task_index_fn.is_file();
650
651 if needs_update || index_exists {
652 drop(lock);
653 setup.update_active_workers(None)?;
654 let lock = setup.lock_task_list_files(false)?;
655 let active_list = read_task_file_from_path(&setup.active_tasks_fn)?;
656 (lock, active_list)
657 } else {
658 (lock, active_list)
659 }
660 };
661
662 let archive = if active_only {
663 None
664 } else {
665 let logrotate = LogRotate::new(&setup.task_archive_fn, true, None, None)?;
666 Some(logrotate.files())
667 };
668
669 let lock = if active_only { None } else { Some(read_lock) };
670
671 Ok(Self {
672 list: active_list.into(),
673 end: active_only,
674 archive,
675 lock,
676 })
677 }
678 }
679
680 impl Iterator for TaskListInfoIterator {
681 type Item = Result<TaskListInfo, Error>;
682
683 fn next(&mut self) -> Option<Self::Item> {
684 loop {
685 if let Some(element) = self.list.pop_back() {
686 return Some(Ok(element));
687 } else if self.end {
688 return None;
689 } else {
690 if let Some(mut archive) = self.archive.take() {
691 if let Some(file) = archive.next() {
692 let list = match read_task_file(file) {
693 Ok(list) => list,
694 Err(err) => return Some(Err(err)),
695 };
696 self.list.append(&mut list.into());
697 self.archive = Some(archive);
698 continue;
699 }
700 }
701
702 self.end = true;
703 self.lock.take();
704 }
705 }
706 }
707 }
708
709 /// Launch long running worker tasks.
710 ///
711 /// A worker task can either be a whole thread, or a simply tokio
712 /// task/future. Each task can `log()` messages, which are stored
713 /// persistently to files. Task should poll the `abort_requested`
714 /// flag, and stop execution when requested.
715 pub struct WorkerTask {
716 setup: &'static WorkerTaskSetup,
717 upid: UPID,
718 data: Mutex<WorkerTaskData>,
719 abort_requested: AtomicBool,
720 }
721
722 impl std::fmt::Display for WorkerTask {
723
724 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
725 self.upid.fmt(f)
726 }
727 }
728
729 struct WorkerTaskData {
730 logger: FileLogger,
731 progress: f64, // 0..1
732 warn_count: u64,
733 pub abort_listeners: Vec<oneshot::Sender<()>>,
734 }
735
736 impl WorkerTask {
737
738 pub fn new(
739 worker_type: &str,
740 worker_id: Option<String>,
741 auth_id: String,
742 to_stdout: bool,
743 ) -> Result<Arc<Self>, Error> {
744
745 let setup = worker_task_setup()?;
746
747 let upid = UPID::new(worker_type, worker_id, auth_id)?;
748 let task_id = upid.task_id;
749
750 let mut path = setup.taskdir.clone();
751
752 path.push(format!("{:02X}", upid.pstart & 255));
753
754 let dir_opts = setup.file_opts.clone()
755 .perm(nix::sys::stat::Mode::from_bits_truncate(0o755));
756
757 create_path(&path, None, Some(dir_opts))?;
758
759 path.push(upid.to_string());
760
761 let logger_options = FileLogOptions {
762 to_stdout,
763 exclusive: true,
764 prefix_time: true,
765 read: true,
766 file_opts: setup.file_opts.clone(),
767 ..Default::default()
768 };
769 let logger = FileLogger::new(&path, logger_options)?;
770
771 let worker = Arc::new(Self {
772 setup,
773 upid: upid.clone(),
774 abort_requested: AtomicBool::new(false),
775 data: Mutex::new(WorkerTaskData {
776 logger,
777 progress: 0.0,
778 warn_count: 0,
779 abort_listeners: vec![],
780 }),
781 });
782
783 // scope to drop the lock again after inserting
784 {
785 let mut hash = WORKER_TASK_LIST.lock().unwrap();
786 hash.insert(task_id, worker.clone());
787 crate::set_worker_count(hash.len());
788 }
789
790 setup.update_active_workers(Some(&upid))?;
791
792 Ok(worker)
793 }
794
795 /// Spawn a new tokio task/future.
796 pub fn spawn<F, T>(
797 worker_type: &str,
798 worker_id: Option<String>,
799 auth_id: String,
800 to_stdout: bool,
801 f: F,
802 ) -> Result<String, Error>
803 where F: Send + 'static + FnOnce(Arc<WorkerTask>) -> T,
804 T: Send + 'static + Future<Output = Result<(), Error>>,
805 {
806 let worker = WorkerTask::new(worker_type, worker_id, auth_id, to_stdout)?;
807 let upid_str = worker.upid.to_string();
808 let f = f(worker.clone());
809 tokio::spawn(async move {
810 let result = f.await;
811 worker.log_result(&result);
812 });
813
814 Ok(upid_str)
815 }
816
817 /// Create a new worker thread.
818 pub fn new_thread<F>(
819 worker_type: &str,
820 worker_id: Option<String>,
821 auth_id: String,
822 to_stdout: bool,
823 f: F,
824 ) -> Result<String, Error>
825 where F: Send + UnwindSafe + 'static + FnOnce(Arc<WorkerTask>) -> Result<(), Error>
826 {
827 let worker = WorkerTask::new(worker_type, worker_id, auth_id, to_stdout)?;
828 let upid_str = worker.upid.to_string();
829
830 let _child = std::thread::Builder::new().name(upid_str.clone()).spawn(move || {
831 let worker1 = worker.clone();
832 let result = match std::panic::catch_unwind(move || f(worker1)) {
833 Ok(r) => r,
834 Err(panic) => {
835 match panic.downcast::<&str>() {
836 Ok(panic_msg) => {
837 Err(format_err!("worker panicked: {}", panic_msg))
838 }
839 Err(_) => {
840 Err(format_err!("worker panicked: unknown type."))
841 }
842 }
843 }
844 };
845
846 worker.log_result(&result);
847 });
848
849 Ok(upid_str)
850 }
851
852 /// create state from self and a result
853 pub fn create_state(&self, result: &Result<(), Error>) -> TaskState {
854 let warn_count = self.data.lock().unwrap().warn_count;
855
856 let endtime = proxmox_time::epoch_i64();
857
858 if let Err(err) = result {
859 TaskState::Error { message: err.to_string(), endtime }
860 } else if warn_count > 0 {
861 TaskState::Warning { count: warn_count, endtime }
862 } else {
863 TaskState::OK { endtime }
864 }
865 }
866
867 /// Log task result, remove task from running list
868 pub fn log_result(&self, result: &Result<(), Error>) {
869 let state = self.create_state(result);
870 self.log_message(state.result_text());
871
872 WORKER_TASK_LIST.lock().unwrap().remove(&self.upid.task_id);
873 let _ = self.setup.update_active_workers(None);
874 crate::set_worker_count(WORKER_TASK_LIST.lock().unwrap().len());
875 }
876
877 /// Log a message.
878 pub fn log_message<S: AsRef<str>>(&self, msg: S) {
879 let mut data = self.data.lock().unwrap();
880 data.logger.log(msg);
881 }
882
883 /// Log a message as warning.
884 pub fn log_warning<S: AsRef<str>>(&self, msg: S) {
885 let mut data = self.data.lock().unwrap();
886 data.logger.log(format!("WARN: {}", msg.as_ref()));
887 data.warn_count += 1;
888 }
889
890 /// Set progress indicator
891 pub fn progress(&self, progress: f64) {
892 if progress >= 0.0 && progress <= 1.0 {
893 let mut data = self.data.lock().unwrap();
894 data.progress = progress;
895 } else {
896 // fixme: log!("task '{}': ignoring strange value for progress '{}'", self.upid, progress);
897 }
898 }
899
900 /// Request abort
901 pub fn request_abort(&self) {
902 let prev_abort = self.abort_requested.swap(true, Ordering::SeqCst);
903 if !prev_abort { // log abort one time
904 self.log_message(format!("received abort request ..."));
905 }
906 // noitify listeners
907 let mut data = self.data.lock().unwrap();
908 loop {
909 match data.abort_listeners.pop() {
910 None => { break; },
911 Some(ch) => {
912 let _ = ch.send(()); // ignore errors here
913 },
914 }
915 }
916 }
917
918 /// Get a future which resolves on task abort
919 pub fn abort_future(&self) -> oneshot::Receiver<()> {
920 let (tx, rx) = oneshot::channel::<()>();
921
922 let mut data = self.data.lock().unwrap();
923 if self.abort_requested() {
924 let _ = tx.send(());
925 } else {
926 data.abort_listeners.push(tx);
927 }
928 rx
929 }
930
931 pub fn upid(&self) -> &UPID {
932 &self.upid
933 }
934 }
935
936 impl WorkerTaskContext for WorkerTask {
937
938 fn abort_requested(&self) -> bool {
939 self.abort_requested.load(Ordering::SeqCst)
940 }
941
942 fn shutdown_requested(&self) -> bool {
943 crate::shutdown_requested()
944 }
945
946 fn fail_on_shutdown(&self) -> Result<(), Error> {
947 crate::fail_on_shutdown()
948 }
949
950 fn log(&self, level: log::Level, message: &std::fmt::Arguments) {
951 match level {
952 log::Level::Error => self.log_warning(&message.to_string()),
953 log::Level::Warn => self.log_warning(&message.to_string()),
954 log::Level::Info => self.log_message(&message.to_string()),
955 log::Level::Debug => self.log_message(&format!("DEBUG: {}", message)),
956 log::Level::Trace => self.log_message(&format!("TRACE: {}", message)),
957 }
958 }
959 }
960
961 /// Wait for a locally spanned worker task
962 ///
963 /// Note: local workers should print logs to stdout, so there is no
964 /// need to fetch/display logs. We just wait for the worker to finish.
965 pub async fn wait_for_local_worker(upid_str: &str) -> Result<(), Error> {
966
967 let upid: UPID = upid_str.parse()?;
968
969 let sleep_duration = core::time::Duration::new(0, 100_000_000);
970
971 loop {
972 if worker_is_active_local(&upid) {
973 tokio::time::sleep(sleep_duration).await;
974 } else {
975 break;
976 }
977 }
978 Ok(())
979 }
980
981 /// Request abort of a local worker (if existing and running)
982 pub fn abort_local_worker(upid: UPID) {
983 if let Some(ref worker) = WORKER_TASK_LIST.lock().unwrap().get(&upid.task_id) {
984 worker.request_abort();
985 }
986 }