]> git.proxmox.com Git - proxmox-backup.git/blob - proxmox-rest-server/src/worker_task.rs
update to first proxmox crate split
[proxmox-backup.git] / proxmox-rest-server / src / worker_task.rs
1 use std::collections::{HashMap, VecDeque};
2 use std::fs::File;
3 use std::path::PathBuf;
4 use std::io::{Read, Write, BufRead, BufReader};
5 use std::panic::UnwindSafe;
6 use std::sync::atomic::{AtomicBool, Ordering};
7 use std::sync::{Arc, Mutex};
8 use std::time::{SystemTime, Duration};
9
10 use anyhow::{bail, format_err, Error};
11 use futures::*;
12 use lazy_static::lazy_static;
13 use serde_json::{json, Value};
14 use serde::{Serialize, Deserialize};
15 use tokio::sync::oneshot;
16 use nix::fcntl::OFlag;
17 use once_cell::sync::OnceCell;
18
19 use proxmox::sys::linux::procfs;
20 use proxmox::tools::fs::{create_path, replace_file, atomic_open_or_create_file, CreateOptions};
21 use proxmox_lang::try_block;
22 use proxmox_schema::upid::UPID;
23
24 use pbs_tools::task::WorkerTaskContext;
25 use pbs_tools::logrotate::{LogRotate, LogRotateFiles};
26
27 use crate::{CommandSocket, FileLogger, FileLogOptions};
28
29 struct TaskListLockGuard(File);
30
31 struct WorkerTaskSetup {
32 file_opts: CreateOptions,
33 taskdir: PathBuf,
34 task_lock_fn: PathBuf,
35 active_tasks_fn: PathBuf,
36 task_index_fn: PathBuf,
37 task_archive_fn: PathBuf,
38 }
39
40 static WORKER_TASK_SETUP: OnceCell<WorkerTaskSetup> = OnceCell::new();
41
42 fn worker_task_setup() -> Result<&'static WorkerTaskSetup, Error> {
43 WORKER_TASK_SETUP.get()
44 .ok_or_else(|| format_err!("WorkerTask library is not initialized"))
45 }
46
47 impl WorkerTaskSetup {
48
49 fn new(basedir: PathBuf, file_opts: CreateOptions) -> Self {
50
51 let mut taskdir = basedir.clone();
52 taskdir.push("tasks");
53
54 let mut task_lock_fn = taskdir.clone();
55 task_lock_fn.push(".active.lock");
56
57 let mut active_tasks_fn = taskdir.clone();
58 active_tasks_fn.push("active");
59
60 let mut task_index_fn = taskdir.clone();
61 task_index_fn.push("index");
62
63 let mut task_archive_fn = taskdir.clone();
64 task_archive_fn.push("archive");
65
66 Self {
67 file_opts,
68 taskdir,
69 task_lock_fn,
70 active_tasks_fn,
71 task_index_fn,
72 task_archive_fn,
73 }
74 }
75
76 fn lock_task_list_files(&self, exclusive: bool) -> Result<TaskListLockGuard, Error> {
77 let options = self.file_opts.clone()
78 .perm(nix::sys::stat::Mode::from_bits_truncate(0o660));
79
80 let timeout = std::time::Duration::new(10, 0);
81
82 let file = proxmox::tools::fs::open_file_locked(
83 &self.task_lock_fn,
84 timeout,
85 exclusive,
86 options,
87 )?;
88
89 Ok(TaskListLockGuard(file))
90 }
91
92 fn log_path(&self, upid: &UPID) -> std::path::PathBuf {
93 let mut path = self.taskdir.clone();
94 path.push(format!("{:02X}", upid.pstart % 256));
95 path.push(upid.to_string());
96 path
97 }
98
99 // atomically read/update the task list, update status of finished tasks
100 // new_upid is added to the list when specified.
101 fn update_active_workers(&self, new_upid: Option<&UPID>) -> Result<(), Error> {
102
103 let lock = self.lock_task_list_files(true)?;
104
105 // TODO remove with 1.x
106 let mut finish_list: Vec<TaskListInfo> = read_task_file_from_path(&self.task_index_fn)?;
107 let had_index_file = !finish_list.is_empty();
108
109 // We use filter_map because one negative case wants to *move* the data into `finish_list`,
110 // clippy doesn't quite catch this!
111 #[allow(clippy::unnecessary_filter_map)]
112 let mut active_list: Vec<TaskListInfo> = read_task_file_from_path(&self.active_tasks_fn)?
113 .into_iter()
114 .filter_map(|info| {
115 if info.state.is_some() {
116 // this can happen when the active file still includes finished tasks
117 finish_list.push(info);
118 return None;
119 }
120
121 if !worker_is_active_local(&info.upid) {
122 // println!("Detected stopped task '{}'", &info.upid_str);
123 let now = proxmox_time::epoch_i64();
124 let status = upid_read_status(&info.upid).unwrap_or(TaskState::Unknown { endtime: now });
125 finish_list.push(TaskListInfo {
126 upid: info.upid,
127 upid_str: info.upid_str,
128 state: Some(status)
129 });
130 return None;
131 }
132
133 Some(info)
134 }).collect();
135
136 if let Some(upid) = new_upid {
137 active_list.push(TaskListInfo { upid: upid.clone(), upid_str: upid.to_string(), state: None });
138 }
139
140 let active_raw = render_task_list(&active_list);
141
142 let options = self.file_opts.clone()
143 .perm(nix::sys::stat::Mode::from_bits_truncate(0o660));
144
145 replace_file(
146 &self.active_tasks_fn,
147 active_raw.as_bytes(),
148 options,
149 )?;
150
151 finish_list.sort_unstable_by(|a, b| {
152 match (&a.state, &b.state) {
153 (Some(s1), Some(s2)) => s1.cmp(&s2),
154 (Some(_), None) => std::cmp::Ordering::Less,
155 (None, Some(_)) => std::cmp::Ordering::Greater,
156 _ => a.upid.starttime.cmp(&b.upid.starttime),
157 }
158 });
159
160 if !finish_list.is_empty() {
161 let options = self.file_opts.clone()
162 .perm(nix::sys::stat::Mode::from_bits_truncate(0o660));
163
164 let mut writer = atomic_open_or_create_file(
165 &self.task_archive_fn,
166 OFlag::O_APPEND | OFlag::O_RDWR,
167 &[],
168 options,
169 )?;
170 for info in &finish_list {
171 writer.write_all(render_task_line(&info).as_bytes())?;
172 }
173 }
174
175 // TODO Remove with 1.x
176 // for compatibility, if we had an INDEX file, we do not need it anymore
177 if had_index_file {
178 let _ = nix::unistd::unlink(&self.task_index_fn);
179 }
180
181 drop(lock);
182
183 Ok(())
184 }
185
186 // Create task log directory with correct permissions
187 fn create_task_log_dirs(&self) -> Result<(), Error> {
188
189 try_block!({
190 let dir_opts = self.file_opts.clone()
191 .perm(nix::sys::stat::Mode::from_bits_truncate(0o755));
192
193 create_path(&self.taskdir, Some(dir_opts.clone()), Some(dir_opts.clone()))?;
194 // fixme:??? create_path(pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR, None, Some(opts))?;
195 Ok(())
196 }).map_err(|err: Error| format_err!("unable to create task log dir - {}", err))
197 }
198 }
199
200 /// Initialize the WorkerTask library
201 pub fn init_worker_tasks(basedir: PathBuf, file_opts: CreateOptions) -> Result<(), Error> {
202 let setup = WorkerTaskSetup::new(basedir, file_opts);
203 setup.create_task_log_dirs()?;
204 WORKER_TASK_SETUP.set(setup)
205 .map_err(|_| format_err!("init_worker_tasks failed - already initialized"))
206 }
207
208 /// checks if the Task Archive is bigger that 'size_threshold' bytes, and
209 /// rotates it if it is
210 pub fn rotate_task_log_archive(size_threshold: u64, compress: bool, max_files: Option<usize>) -> Result<bool, Error> {
211
212 let setup = worker_task_setup()?;
213
214 let _lock = setup.lock_task_list_files(true)?;
215
216 let mut logrotate = LogRotate::new(&setup.task_archive_fn, compress)
217 .ok_or_else(|| format_err!("could not get archive file names"))?;
218
219 logrotate.rotate(size_threshold, None, max_files)
220 }
221
222 /// removes all task logs that are older than the oldest task entry in the
223 /// task archive
224 pub fn cleanup_old_tasks(compressed: bool) -> Result<(), Error> {
225 let setup = worker_task_setup()?;
226
227 let _lock = setup.lock_task_list_files(true)?;
228
229 let logrotate = LogRotate::new(&setup.task_archive_fn, compressed)
230 .ok_or_else(|| format_err!("could not get archive file names"))?;
231
232 let mut timestamp = None;
233 if let Some(last_file) = logrotate.files().last() {
234 let reader = BufReader::new(last_file);
235 for line in reader.lines() {
236 let line = line?;
237 if let Ok((_, _, Some(state))) = parse_worker_status_line(&line) {
238 timestamp = Some(state.endtime());
239 break;
240 }
241 }
242 }
243
244 fn get_modified(entry: std::fs::DirEntry) -> Result<SystemTime, std::io::Error> {
245 entry.metadata()?.modified()
246 }
247
248 if let Some(timestamp) = timestamp {
249 let cutoff_time = if timestamp > 0 {
250 SystemTime::UNIX_EPOCH.checked_add(Duration::from_secs(timestamp as u64))
251 } else {
252 SystemTime::UNIX_EPOCH.checked_sub(Duration::from_secs(-timestamp as u64))
253 }.ok_or_else(|| format_err!("could not calculate cutoff time"))?;
254
255 for i in 0..256 {
256 let mut path = setup.taskdir.clone();
257 path.push(format!("{:02X}", i));
258 for file in std::fs::read_dir(path)? {
259 let file = file?;
260 let path = file.path();
261
262 let modified = get_modified(file)
263 .map_err(|err| format_err!("error getting mtime for {:?}: {}", path, err))?;
264
265 if modified < cutoff_time {
266 match std::fs::remove_file(path) {
267 Ok(()) => {},
268 Err(err) if err.kind() == std::io::ErrorKind::NotFound => {},
269 Err(err) => bail!("could not remove file: {}", err),
270 }
271 }
272 }
273 }
274 }
275
276 Ok(())
277 }
278
279
280 /// Path to the worker log file
281 pub fn upid_log_path(upid: &UPID) -> Result<std::path::PathBuf, Error> {
282 let setup = worker_task_setup()?;
283 Ok(setup.log_path(upid))
284 }
285
286 /// Read endtime (time of last log line) and exitstatus from task log file
287 /// If there is not a single line with at valid datetime, we assume the
288 /// starttime to be the endtime
289 pub fn upid_read_status(upid: &UPID) -> Result<TaskState, Error> {
290
291 let setup = worker_task_setup()?;
292
293 let mut status = TaskState::Unknown { endtime: upid.starttime };
294
295 let path = setup.log_path(upid);
296
297 let mut file = File::open(path)?;
298
299 /// speedup - only read tail
300 use std::io::Seek;
301 use std::io::SeekFrom;
302 let _ = file.seek(SeekFrom::End(-8192)); // ignore errors
303
304 let mut data = Vec::with_capacity(8192);
305 file.read_to_end(&mut data)?;
306
307 // strip newlines at the end of the task logs
308 while data.last() == Some(&b'\n') {
309 data.pop();
310 }
311
312 let last_line = match data.iter().rposition(|c| *c == b'\n') {
313 Some(start) if data.len() > (start+1) => &data[start+1..],
314 Some(_) => &data, // should not happen, since we removed all trailing newlines
315 None => &data,
316 };
317
318 let last_line = std::str::from_utf8(last_line)
319 .map_err(|err| format_err!("upid_read_status: utf8 parse failed: {}", err))?;
320
321 let mut iter = last_line.splitn(2, ": ");
322 if let Some(time_str) = iter.next() {
323 if let Ok(endtime) = proxmox_time::parse_rfc3339(time_str) {
324 // set the endtime even if we cannot parse the state
325 status = TaskState::Unknown { endtime };
326 if let Some(rest) = iter.next().and_then(|rest| rest.strip_prefix("TASK ")) {
327 if let Ok(state) = TaskState::from_endtime_and_message(endtime, rest) {
328 status = state;
329 }
330 }
331 }
332 }
333
334 Ok(status)
335 }
336
337 lazy_static! {
338 static ref WORKER_TASK_LIST: Mutex<HashMap<usize, Arc<WorkerTask>>> = Mutex::new(HashMap::new());
339 }
340
341 /// checks if the task UPID refers to a worker from this process
342 fn is_local_worker(upid: &UPID) -> bool {
343 upid.pid == crate::pid() && upid.pstart == crate::pstart()
344 }
345
346 /// Test if the task is still running
347 pub async fn worker_is_active(upid: &UPID) -> Result<bool, Error> {
348 if is_local_worker(upid) {
349 return Ok(WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id));
350 }
351
352 if procfs::check_process_running_pstart(upid.pid, upid.pstart).is_none() {
353 return Ok(false);
354 }
355
356 let sock = crate::ctrl_sock_from_pid(upid.pid);
357 let cmd = json!({
358 "command": "worker-task-status",
359 "args": {
360 "upid": upid.to_string(),
361 },
362 });
363 let status = crate::send_command(sock, &cmd).await?;
364
365 if let Some(active) = status.as_bool() {
366 Ok(active)
367 } else {
368 bail!("got unexpected result {:?} (expected bool)", status);
369 }
370 }
371
372 /// Test if the task is still running (fast but inaccurate implementation)
373 ///
374 /// If the task is spawned from a different process, we simply return if
375 /// that process is still running. This information is good enough to detect
376 /// stale tasks...
377 pub fn worker_is_active_local(upid: &UPID) -> bool {
378 if is_local_worker(upid) {
379 WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id)
380 } else {
381 procfs::check_process_running_pstart(upid.pid, upid.pstart).is_some()
382 }
383 }
384
385 /// Register task control command on a [CommandSocket].
386 ///
387 /// This create two commands:
388 ///
389 /// * ``worker-task-abort <UPID>``: calls [abort_local_worker]
390 ///
391 /// * ``worker-task-status <UPID>``: return true of false, depending on
392 /// whether the worker is running or stopped.
393 pub fn register_task_control_commands(
394 commando_sock: &mut CommandSocket,
395 ) -> Result<(), Error> {
396 fn get_upid(args: Option<&Value>) -> Result<UPID, Error> {
397 let args = if let Some(args) = args { args } else { bail!("missing args") };
398 let upid = match args.get("upid") {
399 Some(Value::String(upid)) => upid.parse::<UPID>()?,
400 None => bail!("no upid in args"),
401 _ => bail!("unable to parse upid"),
402 };
403 if !is_local_worker(&upid) {
404 bail!("upid does not belong to this process");
405 }
406 Ok(upid)
407 }
408
409 commando_sock.register_command("worker-task-abort".into(), move |args| {
410 let upid = get_upid(args)?;
411
412 abort_local_worker(upid);
413
414 Ok(Value::Null)
415 })?;
416 commando_sock.register_command("worker-task-status".into(), move |args| {
417 let upid = get_upid(args)?;
418
419 let active = WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id);
420
421 Ok(active.into())
422 })?;
423
424 Ok(())
425 }
426
427 /// Try to abort a worker task, but do no wait
428 ///
429 /// Errors (if any) are simply logged.
430 pub fn abort_worker_nowait(upid: UPID) {
431 tokio::spawn(async move {
432 if let Err(err) = abort_worker(upid).await {
433 log::error!("abort worker task failed - {}", err);
434 }
435 });
436 }
437
438 /// Abort a worker task
439 ///
440 /// By sending ``worker-task-abort`` to the control socket.
441 pub async fn abort_worker(upid: UPID) -> Result<(), Error> {
442
443 let sock = crate::ctrl_sock_from_pid(upid.pid);
444 let cmd = json!({
445 "command": "worker-task-abort",
446 "args": {
447 "upid": upid.to_string(),
448 },
449 });
450 crate::send_command(sock, &cmd).map_ok(|_| ()).await
451 }
452
453 fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<TaskState>), Error> {
454
455 let data = line.splitn(3, ' ').collect::<Vec<&str>>();
456
457 let len = data.len();
458
459 match len {
460 1 => Ok((data[0].to_owned(), data[0].parse::<UPID>()?, None)),
461 3 => {
462 let endtime = i64::from_str_radix(data[1], 16)?;
463 let state = TaskState::from_endtime_and_message(endtime, data[2])?;
464 Ok((data[0].to_owned(), data[0].parse::<UPID>()?, Some(state)))
465 }
466 _ => bail!("wrong number of components"),
467 }
468 }
469
470 /// Task State
471 #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
472 pub enum TaskState {
473 /// The Task ended with an undefined state
474 Unknown { endtime: i64 },
475 /// The Task ended and there were no errors or warnings
476 OK { endtime: i64 },
477 /// The Task had 'count' amount of warnings and no errors
478 Warning { count: u64, endtime: i64 },
479 /// The Task ended with the error described in 'message'
480 Error { message: String, endtime: i64 },
481 }
482
483 impl TaskState {
484 pub fn endtime(&self) -> i64 {
485 match *self {
486 TaskState::Unknown { endtime } => endtime,
487 TaskState::OK { endtime } => endtime,
488 TaskState::Warning { endtime, .. } => endtime,
489 TaskState::Error { endtime, .. } => endtime,
490 }
491 }
492
493 fn result_text(&self) -> String {
494 match self {
495 TaskState::Error { message, .. } => format!("TASK ERROR: {}", message),
496 other => format!("TASK {}", other),
497 }
498 }
499
500 fn from_endtime_and_message(endtime: i64, s: &str) -> Result<Self, Error> {
501 if s == "unknown" {
502 Ok(TaskState::Unknown { endtime })
503 } else if s == "OK" {
504 Ok(TaskState::OK { endtime })
505 } else if let Some(warnings) = s.strip_prefix("WARNINGS: ") {
506 let count: u64 = warnings.parse()?;
507 Ok(TaskState::Warning{ count, endtime })
508 } else if !s.is_empty() {
509 let message = if let Some(err) = s.strip_prefix("ERROR: ") { err } else { s }.to_string();
510 Ok(TaskState::Error{ message, endtime })
511 } else {
512 bail!("unable to parse Task Status '{}'", s);
513 }
514 }
515 }
516
517 impl std::cmp::PartialOrd for TaskState {
518 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
519 Some(self.endtime().cmp(&other.endtime()))
520 }
521 }
522
523 impl std::cmp::Ord for TaskState {
524 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
525 self.endtime().cmp(&other.endtime())
526 }
527 }
528
529 impl std::fmt::Display for TaskState {
530 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
531 match self {
532 TaskState::Unknown { .. } => write!(f, "unknown"),
533 TaskState::OK { .. }=> write!(f, "OK"),
534 TaskState::Warning { count, .. } => write!(f, "WARNINGS: {}", count),
535 TaskState::Error { message, .. } => write!(f, "{}", message),
536 }
537 }
538 }
539
540 /// Task details including parsed UPID
541 ///
542 /// If there is no `state`, the task is still running.
543 #[derive(Debug)]
544 pub struct TaskListInfo {
545 /// The parsed UPID
546 pub upid: UPID,
547 /// UPID string representation
548 pub upid_str: String,
549 /// Task `(endtime, status)` if already finished
550 pub state: Option<TaskState>, // endtime, status
551 }
552
553 fn render_task_line(info: &TaskListInfo) -> String {
554 let mut raw = String::new();
555 if let Some(status) = &info.state {
556 raw.push_str(&format!("{} {:08X} {}\n", info.upid_str, status.endtime(), status));
557 } else {
558 raw.push_str(&info.upid_str);
559 raw.push('\n');
560 }
561
562 raw
563 }
564
565 fn render_task_list(list: &[TaskListInfo]) -> String {
566 let mut raw = String::new();
567 for info in list {
568 raw.push_str(&render_task_line(&info));
569 }
570 raw
571 }
572
573 // note this is not locked, caller has to make sure it is
574 // this will skip (and log) lines that are not valid status lines
575 fn read_task_file<R: Read>(reader: R) -> Result<Vec<TaskListInfo>, Error>
576 {
577 let reader = BufReader::new(reader);
578 let mut list = Vec::new();
579 for line in reader.lines() {
580 let line = line?;
581 match parse_worker_status_line(&line) {
582 Ok((upid_str, upid, state)) => list.push(TaskListInfo {
583 upid_str,
584 upid,
585 state
586 }),
587 Err(err) => {
588 log::warn!("unable to parse worker status '{}' - {}", line, err);
589 continue;
590 }
591 };
592 }
593
594 Ok(list)
595 }
596
597 // note this is not locked, caller has to make sure it is
598 fn read_task_file_from_path<P>(path: P) -> Result<Vec<TaskListInfo>, Error>
599 where
600 P: AsRef<std::path::Path> + std::fmt::Debug,
601 {
602 let file = match File::open(&path) {
603 Ok(f) => f,
604 Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
605 Err(err) => bail!("unable to open task list {:?} - {}", path, err),
606 };
607
608 read_task_file(file)
609 }
610
611 /// Iterate over existing/active worker tasks
612 pub struct TaskListInfoIterator {
613 list: VecDeque<TaskListInfo>,
614 end: bool,
615 archive: Option<LogRotateFiles>,
616 lock: Option<TaskListLockGuard>,
617 }
618
619 impl TaskListInfoIterator {
620 /// Creates a new iterator instance.
621 pub fn new(active_only: bool) -> Result<Self, Error> {
622
623 let setup = worker_task_setup()?;
624
625 let (read_lock, active_list) = {
626 let lock = setup.lock_task_list_files(false)?;
627 let active_list = read_task_file_from_path(&setup.active_tasks_fn)?;
628
629 let needs_update = active_list
630 .iter()
631 .any(|info| info.state.is_some() || !worker_is_active_local(&info.upid));
632
633 // TODO remove with 1.x
634 let index_exists = setup.task_index_fn.is_file();
635
636 if needs_update || index_exists {
637 drop(lock);
638 setup.update_active_workers(None)?;
639 let lock = setup.lock_task_list_files(false)?;
640 let active_list = read_task_file_from_path(&setup.active_tasks_fn)?;
641 (lock, active_list)
642 } else {
643 (lock, active_list)
644 }
645 };
646
647 let archive = if active_only {
648 None
649 } else {
650 let logrotate = LogRotate::new(&setup.task_archive_fn, true)
651 .ok_or_else(|| format_err!("could not get archive file names"))?;
652 Some(logrotate.files())
653 };
654
655 let lock = if active_only { None } else { Some(read_lock) };
656
657 Ok(Self {
658 list: active_list.into(),
659 end: active_only,
660 archive,
661 lock,
662 })
663 }
664 }
665
666 impl Iterator for TaskListInfoIterator {
667 type Item = Result<TaskListInfo, Error>;
668
669 fn next(&mut self) -> Option<Self::Item> {
670 loop {
671 if let Some(element) = self.list.pop_back() {
672 return Some(Ok(element));
673 } else if self.end {
674 return None;
675 } else {
676 if let Some(mut archive) = self.archive.take() {
677 if let Some(file) = archive.next() {
678 let list = match read_task_file(file) {
679 Ok(list) => list,
680 Err(err) => return Some(Err(err)),
681 };
682 self.list.append(&mut list.into());
683 self.archive = Some(archive);
684 continue;
685 }
686 }
687
688 self.end = true;
689 self.lock.take();
690 }
691 }
692 }
693 }
694
695 /// Launch long running worker tasks.
696 ///
697 /// A worker task can either be a whole thread, or a simply tokio
698 /// task/future. Each task can `log()` messages, which are stored
699 /// persistently to files. Task should poll the `abort_requested`
700 /// flag, and stop execution when requested.
701 pub struct WorkerTask {
702 setup: &'static WorkerTaskSetup,
703 upid: UPID,
704 data: Mutex<WorkerTaskData>,
705 abort_requested: AtomicBool,
706 }
707
708 impl std::fmt::Display for WorkerTask {
709
710 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
711 self.upid.fmt(f)
712 }
713 }
714
715 struct WorkerTaskData {
716 logger: FileLogger,
717 progress: f64, // 0..1
718 warn_count: u64,
719 pub abort_listeners: Vec<oneshot::Sender<()>>,
720 }
721
722 impl WorkerTask {
723
724 pub fn new(
725 worker_type: &str,
726 worker_id: Option<String>,
727 auth_id: String,
728 to_stdout: bool,
729 ) -> Result<Arc<Self>, Error> {
730
731 let setup = worker_task_setup()?;
732
733 let upid = UPID::new(worker_type, worker_id, auth_id)?;
734 let task_id = upid.task_id;
735
736 let mut path = setup.taskdir.clone();
737
738 path.push(format!("{:02X}", upid.pstart & 255));
739
740 let dir_opts = setup.file_opts.clone()
741 .perm(nix::sys::stat::Mode::from_bits_truncate(0o755));
742
743 create_path(&path, None, Some(dir_opts))?;
744
745 path.push(upid.to_string());
746
747 let logger_options = FileLogOptions {
748 to_stdout,
749 exclusive: true,
750 prefix_time: true,
751 read: true,
752 file_opts: setup.file_opts.clone(),
753 ..Default::default()
754 };
755 let logger = FileLogger::new(&path, logger_options)?;
756
757 let worker = Arc::new(Self {
758 setup,
759 upid: upid.clone(),
760 abort_requested: AtomicBool::new(false),
761 data: Mutex::new(WorkerTaskData {
762 logger,
763 progress: 0.0,
764 warn_count: 0,
765 abort_listeners: vec![],
766 }),
767 });
768
769 // scope to drop the lock again after inserting
770 {
771 let mut hash = WORKER_TASK_LIST.lock().unwrap();
772 hash.insert(task_id, worker.clone());
773 crate::set_worker_count(hash.len());
774 }
775
776 setup.update_active_workers(Some(&upid))?;
777
778 Ok(worker)
779 }
780
781 /// Spawn a new tokio task/future.
782 pub fn spawn<F, T>(
783 worker_type: &str,
784 worker_id: Option<String>,
785 auth_id: String,
786 to_stdout: bool,
787 f: F,
788 ) -> Result<String, Error>
789 where F: Send + 'static + FnOnce(Arc<WorkerTask>) -> T,
790 T: Send + 'static + Future<Output = Result<(), Error>>,
791 {
792 let worker = WorkerTask::new(worker_type, worker_id, auth_id, to_stdout)?;
793 let upid_str = worker.upid.to_string();
794 let f = f(worker.clone());
795 tokio::spawn(async move {
796 let result = f.await;
797 worker.log_result(&result);
798 });
799
800 Ok(upid_str)
801 }
802
803 /// Create a new worker thread.
804 pub fn new_thread<F>(
805 worker_type: &str,
806 worker_id: Option<String>,
807 auth_id: String,
808 to_stdout: bool,
809 f: F,
810 ) -> Result<String, Error>
811 where F: Send + UnwindSafe + 'static + FnOnce(Arc<WorkerTask>) -> Result<(), Error>
812 {
813 let worker = WorkerTask::new(worker_type, worker_id, auth_id, to_stdout)?;
814 let upid_str = worker.upid.to_string();
815
816 let _child = std::thread::Builder::new().name(upid_str.clone()).spawn(move || {
817 let worker1 = worker.clone();
818 let result = match std::panic::catch_unwind(move || f(worker1)) {
819 Ok(r) => r,
820 Err(panic) => {
821 match panic.downcast::<&str>() {
822 Ok(panic_msg) => {
823 Err(format_err!("worker panicked: {}", panic_msg))
824 }
825 Err(_) => {
826 Err(format_err!("worker panicked: unknown type."))
827 }
828 }
829 }
830 };
831
832 worker.log_result(&result);
833 });
834
835 Ok(upid_str)
836 }
837
838 /// create state from self and a result
839 pub fn create_state(&self, result: &Result<(), Error>) -> TaskState {
840 let warn_count = self.data.lock().unwrap().warn_count;
841
842 let endtime = proxmox_time::epoch_i64();
843
844 if let Err(err) = result {
845 TaskState::Error { message: err.to_string(), endtime }
846 } else if warn_count > 0 {
847 TaskState::Warning { count: warn_count, endtime }
848 } else {
849 TaskState::OK { endtime }
850 }
851 }
852
853 /// Log task result, remove task from running list
854 pub fn log_result(&self, result: &Result<(), Error>) {
855 let state = self.create_state(result);
856 self.log_message(state.result_text());
857
858 WORKER_TASK_LIST.lock().unwrap().remove(&self.upid.task_id);
859 let _ = self.setup.update_active_workers(None);
860 crate::set_worker_count(WORKER_TASK_LIST.lock().unwrap().len());
861 }
862
863 /// Log a message.
864 pub fn log_message<S: AsRef<str>>(&self, msg: S) {
865 let mut data = self.data.lock().unwrap();
866 data.logger.log(msg);
867 }
868
869 /// Log a message as warning.
870 pub fn log_warning<S: AsRef<str>>(&self, msg: S) {
871 let mut data = self.data.lock().unwrap();
872 data.logger.log(format!("WARN: {}", msg.as_ref()));
873 data.warn_count += 1;
874 }
875
876 /// Set progress indicator
877 pub fn progress(&self, progress: f64) {
878 if progress >= 0.0 && progress <= 1.0 {
879 let mut data = self.data.lock().unwrap();
880 data.progress = progress;
881 } else {
882 // fixme: log!("task '{}': ignoring strange value for progress '{}'", self.upid, progress);
883 }
884 }
885
886 /// Request abort
887 pub fn request_abort(&self) {
888 let prev_abort = self.abort_requested.swap(true, Ordering::SeqCst);
889 if !prev_abort { // log abort one time
890 self.log_message(format!("received abort request ..."));
891 }
892 // noitify listeners
893 let mut data = self.data.lock().unwrap();
894 loop {
895 match data.abort_listeners.pop() {
896 None => { break; },
897 Some(ch) => {
898 let _ = ch.send(()); // ignore errors here
899 },
900 }
901 }
902 }
903
904 /// Get a future which resolves on task abort
905 pub fn abort_future(&self) -> oneshot::Receiver<()> {
906 let (tx, rx) = oneshot::channel::<()>();
907
908 let mut data = self.data.lock().unwrap();
909 if self.abort_requested() {
910 let _ = tx.send(());
911 } else {
912 data.abort_listeners.push(tx);
913 }
914 rx
915 }
916
917 pub fn upid(&self) -> &UPID {
918 &self.upid
919 }
920 }
921
922 impl WorkerTaskContext for WorkerTask {
923
924 fn abort_requested(&self) -> bool {
925 self.abort_requested.load(Ordering::SeqCst)
926 }
927
928 fn shutdown_requested(&self) -> bool {
929 crate::shutdown_requested()
930 }
931
932 fn fail_on_shutdown(&self) -> Result<(), Error> {
933 crate::fail_on_shutdown()
934 }
935
936 fn log(&self, level: log::Level, message: &std::fmt::Arguments) {
937 match level {
938 log::Level::Error => self.log_warning(&message.to_string()),
939 log::Level::Warn => self.log_warning(&message.to_string()),
940 log::Level::Info => self.log_message(&message.to_string()),
941 log::Level::Debug => self.log_message(&format!("DEBUG: {}", message)),
942 log::Level::Trace => self.log_message(&format!("TRACE: {}", message)),
943 }
944 }
945 }
946
947 /// Wait for a locally spanned worker task
948 ///
949 /// Note: local workers should print logs to stdout, so there is no
950 /// need to fetch/display logs. We just wait for the worker to finish.
951 pub async fn wait_for_local_worker(upid_str: &str) -> Result<(), Error> {
952
953 let upid: UPID = upid_str.parse()?;
954
955 let sleep_duration = core::time::Duration::new(0, 100_000_000);
956
957 loop {
958 if worker_is_active_local(&upid) {
959 tokio::time::sleep(sleep_duration).await;
960 } else {
961 break;
962 }
963 }
964 Ok(())
965 }
966
967 /// Request abort of a local worker (if existing and running)
968 pub fn abort_local_worker(upid: UPID) {
969 if let Some(ref worker) = WORKER_TASK_LIST.lock().unwrap().get(&upid.task_id) {
970 worker.request_abort();
971 }
972 }