]> git.proxmox.com Git - proxmox-backup.git/blame - src/server/worker_task.rs
api2/status: use the TaskListInfoIterator here
[proxmox-backup.git] / src / server / worker_task.rs
CommitLineData
e7244387 1use std::collections::{HashMap, VecDeque};
4b01c983 2use std::fs::File;
5ade6c25 3use std::io::{Read, Write, BufRead, BufReader};
d3f4c08f 4use std::panic::UnwindSafe;
18c0df4c
WB
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::sync::{Arc, Mutex};
d3f4c08f 7
f7d4e4b5 8use anyhow::{bail, format_err, Error};
18c0df4c
WB
9use futures::*;
10use lazy_static::lazy_static;
619495b2 11use nix::unistd::Pid;
321070b4 12use serde_json::{json, Value};
4c116baf 13use serde::{Serialize, Deserialize};
18c0df4c 14use tokio::sync::oneshot;
479f6e40 15
619495b2 16use proxmox::sys::linux::procfs;
9ea4bce4 17use proxmox::try_block;
98c259b4 18use proxmox::tools::fs::{create_path, open_file_locked, replace_file, CreateOptions};
e18a6c9e 19
634132fe
DM
20use super::UPID;
21
e7244387 22use crate::tools::logrotate::{LogRotate, LogRotateFiles};
e18a6c9e 23use crate::tools::FileLogger;
e7cb4dc5 24use crate::api2::types::Userid;
479f6e40 25
2ec979e4 26macro_rules! PROXMOX_BACKUP_VAR_RUN_DIR_M { () => ("/run/proxmox-backup") }
634132fe
DM
27macro_rules! PROXMOX_BACKUP_LOG_DIR_M { () => ("/var/log/proxmox-backup") }
28macro_rules! PROXMOX_BACKUP_TASK_DIR_M { () => (concat!( PROXMOX_BACKUP_LOG_DIR_M!(), "/tasks")) }
29
d607b886 30pub const PROXMOX_BACKUP_VAR_RUN_DIR: &str = PROXMOX_BACKUP_VAR_RUN_DIR_M!();
634132fe
DM
31pub const PROXMOX_BACKUP_LOG_DIR: &str = PROXMOX_BACKUP_LOG_DIR_M!();
32pub const PROXMOX_BACKUP_TASK_DIR: &str = PROXMOX_BACKUP_TASK_DIR_M!();
33pub const PROXMOX_BACKUP_TASK_LOCK_FN: &str = concat!(PROXMOX_BACKUP_TASK_DIR_M!(), "/.active.lock");
34pub const PROXMOX_BACKUP_ACTIVE_TASK_FN: &str = concat!(PROXMOX_BACKUP_TASK_DIR_M!(), "/active");
784fa1c2 35pub const PROXMOX_BACKUP_INDEX_TASK_FN: &str = concat!(PROXMOX_BACKUP_TASK_DIR_M!(), "/index");
5ade6c25 36pub const PROXMOX_BACKUP_ARCHIVE_TASK_FN: &str = concat!(PROXMOX_BACKUP_TASK_DIR_M!(), "/archive");
784fa1c2
DC
37
38const MAX_INDEX_TASKS: usize = 1000;
479f6e40
DM
39
40lazy_static! {
41 static ref WORKER_TASK_LIST: Mutex<HashMap<usize, Arc<WorkerTask>>> = Mutex::new(HashMap::new());
d607b886
DM
42
43 static ref MY_PID: i32 = unsafe { libc::getpid() };
6a0dc4a5 44 static ref MY_PID_PSTART: u64 = procfs::PidStat::read_from_pid(Pid::from_raw(*MY_PID))
619495b2
WB
45 .unwrap()
46 .starttime;
479f6e40
DM
47}
48
634132fe 49/// Test if the task is still running
5751e495
DM
50pub async fn worker_is_active(upid: &UPID) -> Result<bool, Error> {
51 if (upid.pid == *MY_PID) && (upid.pstart == *MY_PID_PSTART) {
52 return Ok(WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id));
53 }
54
55 if !procfs::check_process_running_pstart(upid.pid, upid.pstart).is_some() {
56 return Ok(false);
57 }
58
59 let socketname = format!(
60 "\0{}/proxmox-task-control-{}.sock", PROXMOX_BACKUP_VAR_RUN_DIR, upid.pid);
61
62 let cmd = json!({
63 "command": "status",
64 "upid": upid.to_string(),
65 });
66
67 let status = super::send_command(socketname, cmd).await?;
4494d078 68
5751e495
DM
69 if let Some(active) = status.as_bool() {
70 Ok(active)
71 } else {
72 bail!("got unexpected result {:?} (expected bool)", status);
73 }
74}
75
76/// Test if the task is still running (fast but inaccurate implementation)
77///
78/// If the task is spanned from a different process, we simply return if
79/// that process is still running. This information is good enough to detect
80/// stale tasks...
77ebbefc 81pub fn worker_is_active_local(upid: &UPID) -> bool {
634132fe 82 if (upid.pid == *MY_PID) && (upid.pstart == *MY_PID_PSTART) {
62ee2eb4 83 WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id)
634132fe 84 } else {
62ee2eb4 85 procfs::check_process_running_pstart(upid.pid, upid.pstart).is_some()
479f6e40
DM
86 }
87}
88
d607b886
DM
89pub fn create_task_control_socket() -> Result<(), Error> {
90
91 let socketname = format!(
9b002cbc 92 "\0{}/proxmox-task-control-{}.sock", PROXMOX_BACKUP_VAR_RUN_DIR, *MY_PID);
d607b886 93
9b002cbc 94 let control_future = super::create_control_socket(socketname, |param| {
d607b886 95 let param = param.as_object()
62ee2eb4 96 .ok_or_else(|| format_err!("unable to parse parameters (expected json object)"))?;
321070b4 97 if param.keys().count() != 2 { bail!("wrong number of parameters"); }
d607b886 98
5751e495 99 let command = param["command"].as_str()
62ee2eb4 100 .ok_or_else(|| format_err!("unable to parse parameters (missing command)"))?;
d607b886 101
5751e495
DM
102 // we have only two commands for now
103 if !(command == "abort-task" || command == "status") { bail!("got unknown command '{}'", command); }
d607b886
DM
104
105 let upid_str = param["upid"].as_str()
62ee2eb4 106 .ok_or_else(|| format_err!("unable to parse parameters (missing upid)"))?;
d607b886
DM
107
108 let upid = upid_str.parse::<UPID>()?;
109
110 if !((upid.pid == *MY_PID) && (upid.pstart == *MY_PID_PSTART)) {
111 bail!("upid does not belong to this process");
112 }
113
114 let hash = WORKER_TASK_LIST.lock().unwrap();
5751e495
DM
115
116 match command {
117 "abort-task" => {
118 if let Some(ref worker) = hash.get(&upid.task_id) {
119 worker.request_abort();
120 } else {
121 // assume task is already stopped
122 }
123 Ok(Value::Null)
124 }
125 "status" => {
126 let active = hash.contains_key(&upid.task_id);
127 Ok(active.into())
128 }
129 _ => {
130 bail!("got unknown command '{}'", command);
131 }
d607b886 132 }
d607b886
DM
133 })?;
134
135 tokio::spawn(control_future);
136
137 Ok(())
138}
139
321070b4 140pub fn abort_worker_async(upid: UPID) {
75fef4b4
WB
141 tokio::spawn(async move {
142 if let Err(err) = abort_worker(upid).await {
321070b4
DM
143 eprintln!("abort worker failed - {}", err);
144 }
75fef4b4 145 });
321070b4
DM
146}
147
5751e495 148pub async fn abort_worker(upid: UPID) -> Result<(), Error> {
321070b4
DM
149
150 let target_pid = upid.pid;
151
152 let socketname = format!(
153 "\0{}/proxmox-task-control-{}.sock", PROXMOX_BACKUP_VAR_RUN_DIR, target_pid);
154
155 let cmd = json!({
156 "command": "abort-task",
157 "upid": upid.to_string(),
158 });
159
5751e495 160 super::send_command(socketname, cmd).map_ok(|_| ()).await
321070b4
DM
161}
162
77bd2a46 163fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<TaskState>), Error> {
4b01c983
DM
164
165 let data = line.splitn(3, ' ').collect::<Vec<&str>>();
166
167 let len = data.len();
168
169 match len {
170 1 => Ok((data[0].to_owned(), data[0].parse::<UPID>()?, None)),
171 3 => {
172 let endtime = i64::from_str_radix(data[1], 16)?;
77bd2a46
DC
173 let state = TaskState::from_endtime_and_message(endtime, data[2])?;
174 Ok((data[0].to_owned(), data[0].parse::<UPID>()?, Some(state)))
4b01c983
DM
175 }
176 _ => bail!("wrong number of components"),
177 }
178}
179
35950380 180/// Create task log directory with correct permissions
d607b886 181pub fn create_task_log_dirs() -> Result<(), Error> {
35950380
DM
182
183 try_block!({
f74a03da 184 let backup_user = crate::backup::backup_user()?;
35238e23 185 let opts = CreateOptions::new()
f74a03da
DM
186 .owner(backup_user.uid)
187 .group(backup_user.gid);
35950380 188
35238e23
WB
189 create_path(PROXMOX_BACKUP_LOG_DIR, None, Some(opts.clone()))?;
190 create_path(PROXMOX_BACKUP_TASK_DIR, None, Some(opts.clone()))?;
191 create_path(PROXMOX_BACKUP_VAR_RUN_DIR, None, Some(opts))?;
35950380
DM
192 Ok(())
193 }).map_err(|err: Error| format_err!("unable to create task log dir - {}", err))?;
194
195 Ok(())
196}
197
ae197dda
DC
198/// Read endtime (time of last log line) and exitstatus from task log file
199/// If there is not a single line with at valid datetime, we assume the
200/// starttime to be the endtime
77bd2a46 201pub fn upid_read_status(upid: &UPID) -> Result<TaskState, Error> {
56b66645
DM
202
203 let mut status = TaskState::Unknown { endtime: upid.starttime };
4b01c983 204
4494d078 205 let path = upid.log_path();
4b01c983 206
0bfd87bc
DM
207 let mut file = File::open(path)?;
208
209 /// speedup - only read tail
210 use std::io::Seek;
211 use std::io::SeekFrom;
212 let _ = file.seek(SeekFrom::End(-8192)); // ignore errors
213
56b66645
DM
214 let mut data = Vec::with_capacity(8192);
215 file.read_to_end(&mut data)?;
4b01c983 216
a4c11436 217 // task logs should end with newline, we do not want it here
5e39918f 218 if data.len() > 0 && data[data.len()-1] == b'\n' {
a4c11436
DC
219 data.pop();
220 }
221
56b66645
DM
222 let last_line = {
223 let mut start = 0;
a4c11436 224 for pos in (0..data.len()).rev() {
56b66645 225 if data[pos] == b'\n' {
5e39918f 226 start = data.len().min(pos + 1);
56b66645
DM
227 break;
228 }
ae197dda 229 }
56b66645
DM
230 &data[start..]
231 };
232
233 let last_line = std::str::from_utf8(last_line)
234 .map_err(|err| format_err!("upid_read_status: utf8 parse failed: {}", err))?;
235
236 let mut iter = last_line.splitn(2, ": ");
237 if let Some(time_str) = iter.next() {
6a7be83e 238 if let Ok(endtime) = proxmox::tools::time::parse_rfc3339(time_str) {
56b66645 239 if let Some(rest) = iter.next().and_then(|rest| rest.strip_prefix("TASK ")) {
77bd2a46 240 if let Ok(state) = TaskState::from_endtime_and_message(endtime, rest) {
4c116baf 241 status = state;
4b01c983
DM
242 }
243 }
244 }
245 }
246
77bd2a46 247 Ok(status)
4b01c983
DM
248}
249
4c116baf 250/// Task State
77bd2a46 251#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
4c116baf
DC
252pub enum TaskState {
253 /// The Task ended with an undefined state
77bd2a46 254 Unknown { endtime: i64 },
4c116baf 255 /// The Task ended and there were no errors or warnings
77bd2a46 256 OK { endtime: i64 },
4c116baf 257 /// The Task had 'count' amount of warnings and no errors
77bd2a46 258 Warning { count: u64, endtime: i64 },
4c116baf 259 /// The Task ended with the error described in 'message'
77bd2a46 260 Error { message: String, endtime: i64 },
4c116baf
DC
261}
262
263impl TaskState {
77bd2a46
DC
264 pub fn endtime(&self) -> i64 {
265 match *self {
266 TaskState::Unknown { endtime } => endtime,
267 TaskState::OK { endtime } => endtime,
268 TaskState::Warning { endtime, .. } => endtime,
269 TaskState::Error { endtime, .. } => endtime,
4c116baf
DC
270 }
271 }
4c116baf 272
77bd2a46 273 fn result_text(&self) -> String {
4c116baf 274 match self {
77bd2a46
DC
275 TaskState::Error { message, .. } => format!("TASK ERROR: {}", message),
276 other => format!("TASK {}", other),
4c116baf
DC
277 }
278 }
4c116baf 279
77bd2a46 280 fn from_endtime_and_message(endtime: i64, s: &str) -> Result<Self, Error> {
4c116baf 281 if s == "unknown" {
77bd2a46 282 Ok(TaskState::Unknown { endtime })
4c116baf 283 } else if s == "OK" {
77bd2a46 284 Ok(TaskState::OK { endtime })
4c116baf
DC
285 } else if s.starts_with("WARNINGS: ") {
286 let count: u64 = s[10..].parse()?;
77bd2a46 287 Ok(TaskState::Warning{ count, endtime })
4c116baf
DC
288 } else if s.len() > 0 {
289 let message = if s.starts_with("ERROR: ") { &s[7..] } else { s }.to_string();
77bd2a46 290 Ok(TaskState::Error{ message, endtime })
4c116baf
DC
291 } else {
292 bail!("unable to parse Task Status '{}'", s);
293 }
294 }
295}
296
77bd2a46
DC
297impl std::cmp::PartialOrd for TaskState {
298 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
299 Some(self.endtime().cmp(&other.endtime()))
300 }
301}
302
303impl std::cmp::Ord for TaskState {
304 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
305 self.endtime().cmp(&other.endtime())
306 }
307}
308
309impl std::fmt::Display for TaskState {
310 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
311 match self {
312 TaskState::Unknown { .. } => write!(f, "unknown"),
313 TaskState::OK { .. }=> write!(f, "OK"),
314 TaskState::Warning { count, .. } => write!(f, "WARNINGS: {}", count),
315 TaskState::Error { message, .. } => write!(f, "{}", message),
316 }
317 }
318}
319
93aebb38
DM
320/// Task details including parsed UPID
321///
322/// If there is no `state`, the task is still running.
323#[derive(Debug)]
324pub struct TaskListInfo {
325 /// The parsed UPID
326 pub upid: UPID,
327 /// UPID string representation
328 pub upid_str: String,
329 /// Task `(endtime, status)` if already finished
77bd2a46 330 pub state: Option<TaskState>, // endtime, status
93aebb38
DM
331}
332
66f4e6a8
DC
333fn lock_task_list_files(exclusive: bool) -> Result<std::fs::File, Error> {
334 let backup_user = crate::backup::backup_user()?;
335
336 let lock = open_file_locked(PROXMOX_BACKUP_TASK_LOCK_FN, std::time::Duration::new(10, 0), exclusive)?;
337 nix::unistd::chown(PROXMOX_BACKUP_TASK_LOCK_FN, Some(backup_user.uid), Some(backup_user.gid))?;
338
339 Ok(lock)
340}
341
93aebb38
DM
342// atomically read/update the task list, update status of finished tasks
343// new_upid is added to the list when specified.
344// Returns a sorted list of known tasks,
345fn update_active_workers(new_upid: Option<&UPID>) -> Result<Vec<TaskListInfo>, Error> {
4b01c983 346
f74a03da 347 let backup_user = crate::backup::backup_user()?;
35950380 348
66f4e6a8 349 let lock = lock_task_list_files(true)?;
4b01c983 350
784fa1c2
DC
351 let mut finish_list: Vec<TaskListInfo> = read_task_file_from_path(PROXMOX_BACKUP_INDEX_TASK_FN)?;
352 let mut active_list: Vec<TaskListInfo> = read_task_file_from_path(PROXMOX_BACKUP_ACTIVE_TASK_FN)?
353 .into_iter()
354 .filter_map(|info| {
355 if info.state.is_some() {
356 // this can happen when the active file still includes finished tasks
357 finish_list.push(info);
358 return None;
4b01c983 359 }
4b01c983 360
784fa1c2
DC
361 if !worker_is_active_local(&info.upid) {
362 println!("Detected stopped UPID {}", &info.upid_str);
363 let now = proxmox::tools::time::epoch_i64();
364 let status = upid_read_status(&info.upid)
365 .unwrap_or_else(|_| TaskState::Unknown { endtime: now });
366 finish_list.push(TaskListInfo {
367 upid: info.upid,
368 upid_str: info.upid_str,
369 state: Some(status)
370 });
371 return None;
4b01c983 372 }
784fa1c2
DC
373
374 Some(info)
375 }).collect();
4b01c983
DM
376
377 if let Some(upid) = new_upid {
378 active_list.push(TaskListInfo { upid: upid.clone(), upid_str: upid.to_string(), state: None });
379 }
380
784fa1c2 381 let active_raw = render_task_list(&active_list);
4b01c983 382
784fa1c2
DC
383 replace_file(
384 PROXMOX_BACKUP_ACTIVE_TASK_FN,
385 active_raw.as_bytes(),
386 CreateOptions::new()
387 .owner(backup_user.uid)
388 .group(backup_user.gid),
389 )?;
93aebb38 390
784fa1c2 391 finish_list.sort_unstable_by(|a, b| {
4b01c983 392 match (&a.state, &b.state) {
77bd2a46 393 (Some(s1), Some(s2)) => s1.cmp(&s2),
4b01c983
DM
394 (Some(_), None) => std::cmp::Ordering::Less,
395 (None, Some(_)) => std::cmp::Ordering::Greater,
396 _ => a.upid.starttime.cmp(&b.upid.starttime),
397 }
398 });
399
784fa1c2
DC
400 let start = (finish_list.len()-MAX_INDEX_TASKS).max(0);
401 let end = (start+MAX_INDEX_TASKS).min(finish_list.len());
402 let index_raw = render_task_list(&finish_list[start..end]);
4b01c983 403
feaa1ad3 404 replace_file(
784fa1c2
DC
405 PROXMOX_BACKUP_INDEX_TASK_FN,
406 index_raw.as_bytes(),
feaa1ad3 407 CreateOptions::new()
f74a03da
DM
408 .owner(backup_user.uid)
409 .group(backup_user.gid),
feaa1ad3 410 )?;
4b01c983 411
5ade6c25
DC
412 if !finish_list.is_empty() && start > 0 {
413 match std::fs::OpenOptions::new().append(true).create(true).open(PROXMOX_BACKUP_ARCHIVE_TASK_FN) {
414 Ok(mut writer) => {
415 for info in &finish_list[0..start] {
416 writer.write_all(render_task_line(&info).as_bytes())?;
417 }
418 },
419 Err(err) => bail!("could not write task archive - {}", err),
420 }
421
422 nix::unistd::chown(PROXMOX_BACKUP_ARCHIVE_TASK_FN, Some(backup_user.uid), Some(backup_user.gid))?;
423 }
424
4b01c983
DM
425 drop(lock);
426
784fa1c2
DC
427 finish_list.append(&mut active_list);
428 finish_list.reverse();
429 Ok(finish_list)
4b01c983
DM
430}
431
93aebb38
DM
432/// Returns a sorted list of known tasks
433///
434/// The list is sorted by `(starttime, endtime)` in ascending order
435pub fn read_task_list() -> Result<Vec<TaskListInfo>, Error> {
436 update_active_workers(None)
437}
4b01c983 438
bbeb0256
DC
439fn render_task_line(info: &TaskListInfo) -> String {
440 let mut raw = String::new();
441 if let Some(status) = &info.state {
442 raw.push_str(&format!("{} {:08X} {}\n", info.upid_str, status.endtime(), status));
443 } else {
444 raw.push_str(&info.upid_str);
445 raw.push('\n');
446 }
447
448 raw
449}
450
451fn render_task_list(list: &[TaskListInfo]) -> String {
452 let mut raw = String::new();
453 for info in list {
454 raw.push_str(&render_task_line(&info));
455 }
456 raw
457}
458
784fa1c2
DC
459// note this is not locked, caller has to make sure it is
460// this will skip (and log) lines that are not valid status lines
461fn read_task_file<R: Read>(reader: R) -> Result<Vec<TaskListInfo>, Error>
462{
463 let reader = BufReader::new(reader);
464 let mut list = Vec::new();
465 for line in reader.lines() {
466 let line = line?;
467 match parse_worker_status_line(&line) {
468 Ok((upid_str, upid, state)) => list.push(TaskListInfo {
469 upid_str,
470 upid,
471 state
472 }),
473 Err(err) => {
474 eprintln!("unable to parse worker status '{}' - {}", line, err);
475 continue;
476 }
477 };
478 }
479
480 Ok(list)
481}
482
483// note this is not locked, caller has to make sure it is
484fn read_task_file_from_path<P>(path: P) -> Result<Vec<TaskListInfo>, Error>
485where
486 P: AsRef<std::path::Path> + std::fmt::Debug,
487{
488 let file = match File::open(&path) {
489 Ok(f) => f,
490 Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
491 Err(err) => bail!("unable to open task list {:?} - {}", path, err),
492 };
493
494 read_task_file(file)
495}
496
e7244387
DC
497enum TaskFile {
498 Active,
499 Index,
500 Archive,
501 End,
502}
503
504pub struct TaskListInfoIterator {
505 list: VecDeque<TaskListInfo>,
506 file: TaskFile,
507 archive: Option<LogRotateFiles>,
508 lock: Option<File>,
509}
510
511impl TaskListInfoIterator {
512 pub fn new(active_only: bool) -> Result<Self, Error> {
513 let (read_lock, active_list) = {
514 let lock = lock_task_list_files(false)?;
515 let active_list = read_task_file_from_path(PROXMOX_BACKUP_ACTIVE_TASK_FN)?;
516
517 let needs_update = active_list
518 .iter()
519 .any(|info| info.state.is_none() && !worker_is_active_local(&info.upid));
520
521 if needs_update {
522 drop(lock);
523 update_active_workers(None)?;
524 let lock = lock_task_list_files(false)?;
525 let active_list = read_task_file_from_path(PROXMOX_BACKUP_ACTIVE_TASK_FN)?;
526 (lock, active_list)
527 } else {
528 (lock, active_list)
529 }
530 };
531
532 let archive = if active_only {
533 None
534 } else {
535 let logrotate = LogRotate::new(PROXMOX_BACKUP_ARCHIVE_TASK_FN, true).ok_or_else(|| format_err!("could not get archive file names"))?;
536 Some(logrotate.files())
537 };
538
539 let file = if active_only { TaskFile::End } else { TaskFile::Active };
540 let lock = if active_only { None } else { Some(read_lock) };
541
542 Ok(Self {
543 list: active_list.into(),
544 file,
545 archive,
546 lock,
547 })
548 }
549}
550
551impl Iterator for TaskListInfoIterator {
552 type Item = Result<TaskListInfo, Error>;
553
554 fn next(&mut self) -> Option<Self::Item> {
555 loop {
556 if let Some(element) = self.list.pop_back() {
557 return Some(Ok(element));
558 } else {
559 match self.file {
560 TaskFile::Active => {
561 let index = match read_task_file_from_path(PROXMOX_BACKUP_INDEX_TASK_FN) {
562 Ok(index) => index,
563 Err(err) => return Some(Err(err)),
564 };
565 self.list.append(&mut index.into());
566 self.file = TaskFile::Index;
567 },
568 TaskFile::Index | TaskFile::Archive => {
569 if let Some(mut archive) = self.archive.take() {
570 if let Some(file) = archive.next() {
571 let list = match read_task_file(file) {
572 Ok(list) => list,
573 Err(err) => return Some(Err(err)),
574 };
575 self.list.append(&mut list.into());
576 self.archive = Some(archive);
577 self.file = TaskFile::Archive;
578 continue;
579 }
580 }
581 self.file = TaskFile::End;
582 self.lock.take();
583 return None;
584 }
585 TaskFile::End => return None,
586 }
587 }
588 }
589 }
590}
591
882594c5
DM
592/// Launch long running worker tasks.
593///
594/// A worker task can either be a whole thread, or a simply tokio
595/// task/future. Each task can `log()` messages, which are stored
596/// persistently to files. Task should poll the `abort_requested`
597/// flag, and stop execution when requested.
479f6e40
DM
598#[derive(Debug)]
599pub struct WorkerTask {
600 upid: UPID,
601 data: Mutex<WorkerTaskData>,
602 abort_requested: AtomicBool,
603}
604
605impl std::fmt::Display for WorkerTask {
606
607 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
608 self.upid.fmt(f)
609 }
610}
611
612#[derive(Debug)]
613struct WorkerTaskData {
614 logger: FileLogger,
615 progress: f64, // 0..1
f6de2c73 616 warn_count: u64,
75bc49be 617 pub abort_listeners: Vec<oneshot::Sender<()>>,
479f6e40
DM
618}
619
620impl Drop for WorkerTask {
621
622 fn drop(&mut self) {
623 println!("unregister worker");
624 }
625}
626
627impl WorkerTask {
628
e7cb4dc5 629 pub fn new(worker_type: &str, worker_id: Option<String>, userid: Userid, to_stdout: bool) -> Result<Arc<Self>, Error> {
479f6e40
DM
630 println!("register worker");
631
e7cb4dc5 632 let upid = UPID::new(worker_type, worker_id, userid)?;
634132fe 633 let task_id = upid.task_id;
479f6e40 634
634132fe 635 let mut path = std::path::PathBuf::from(PROXMOX_BACKUP_TASK_DIR);
35950380 636
479f6e40
DM
637 path.push(format!("{:02X}", upid.pstart % 256));
638
f74a03da 639 let backup_user = crate::backup::backup_user()?;
35950380 640
f74a03da 641 create_path(&path, None, Some(CreateOptions::new().owner(backup_user.uid).group(backup_user.gid)))?;
479f6e40
DM
642
643 path.push(upid.to_string());
644
645 println!("FILE: {:?}", path);
646
35950380 647 let logger = FileLogger::new(&path, to_stdout)?;
f74a03da 648 nix::unistd::chown(&path, Some(backup_user.uid), Some(backup_user.gid))?;
479f6e40
DM
649
650 let worker = Arc::new(Self {
05d755b2 651 upid: upid.clone(),
479f6e40
DM
652 abort_requested: AtomicBool::new(false),
653 data: Mutex::new(WorkerTaskData {
654 logger,
655 progress: 0.0,
f6de2c73 656 warn_count: 0,
75bc49be 657 abort_listeners: vec![],
479f6e40
DM
658 }),
659 });
660
05d755b2
DC
661 // scope to drop the lock again after inserting
662 {
663 let mut hash = WORKER_TASK_LIST.lock().unwrap();
664 hash.insert(task_id, worker.clone());
665 super::set_worker_count(hash.len());
666 }
7a630df7 667
05d755b2 668 update_active_workers(Some(&upid))?;
479f6e40
DM
669
670 Ok(worker)
671 }
672
882594c5 673 /// Spawn a new tokio task/future.
660c6846
DM
674 pub fn spawn<F, T>(
675 worker_type: &str,
676 worker_id: Option<String>,
e7cb4dc5 677 userid: Userid,
660c6846
DM
678 to_stdout: bool,
679 f: F,
680 ) -> Result<String, Error>
479f6e40 681 where F: Send + 'static + FnOnce(Arc<WorkerTask>) -> T,
75fef4b4 682 T: Send + 'static + Future<Output = Result<(), Error>>,
479f6e40 683 {
e7cb4dc5 684 let worker = WorkerTask::new(worker_type, worker_id, userid, to_stdout)?;
660c6846 685 let upid_str = worker.upid.to_string();
75fef4b4
WB
686 let f = f(worker.clone());
687 tokio::spawn(async move {
688 let result = f.await;
dd8e744f 689 worker.log_result(&result);
75fef4b4 690 });
479f6e40 691
660c6846 692 Ok(upid_str)
479f6e40
DM
693 }
694
882594c5 695 /// Create a new worker thread.
660c6846
DM
696 pub fn new_thread<F>(
697 worker_type: &str,
698 worker_id: Option<String>,
e7cb4dc5 699 userid: Userid,
660c6846
DM
700 to_stdout: bool,
701 f: F,
702 ) -> Result<String, Error>
d3f4c08f 703 where F: Send + UnwindSafe + 'static + FnOnce(Arc<WorkerTask>) -> Result<(), Error>
479f6e40
DM
704 {
705 println!("register worker thread");
706
e7cb4dc5 707 let worker = WorkerTask::new(worker_type, worker_id, userid, to_stdout)?;
660c6846 708 let upid_str = worker.upid.to_string();
479f6e40 709
217170e1 710 let _child = std::thread::Builder::new().name(upid_str.clone()).spawn(move || {
d3f4c08f
DM
711 let worker1 = worker.clone();
712 let result = match std::panic::catch_unwind(move || f(worker1)) {
713 Ok(r) => r,
714 Err(panic) => {
715 match panic.downcast::<&str>() {
716 Ok(panic_msg) => {
717 Err(format_err!("worker panicked: {}", panic_msg))
718 }
719 Err(_) => {
720 Err(format_err!("worker panicked: unknown type."))
721 }
722 }
723 }
724 };
725
dd8e744f 726 worker.log_result(&result);
479f6e40
DM
727 });
728
660c6846 729 Ok(upid_str)
479f6e40
DM
730 }
731
4c116baf
DC
732 /// create state from self and a result
733 pub fn create_state(&self, result: &Result<(), Error>) -> TaskState {
f6de2c73 734 let warn_count = self.data.lock().unwrap().warn_count;
cef03f41 735
6a7be83e 736 let endtime = proxmox::tools::time::epoch_i64();
77bd2a46 737
4b01c983 738 if let Err(err) = result {
77bd2a46 739 TaskState::Error { message: err.to_string(), endtime }
f6de2c73 740 } else if warn_count > 0 {
77bd2a46 741 TaskState::Warning { count: warn_count, endtime }
4b01c983 742 } else {
77bd2a46 743 TaskState::OK { endtime }
4b01c983 744 }
cef03f41
DC
745 }
746
747 /// Log task result, remove task from running list
748 pub fn log_result(&self, result: &Result<(), Error>) {
4c116baf
DC
749 let state = self.create_state(result);
750 self.log(state.result_text());
418def7a
DM
751
752 WORKER_TASK_LIST.lock().unwrap().remove(&self.upid.task_id);
753 let _ = update_active_workers(None);
754 super::set_worker_count(WORKER_TASK_LIST.lock().unwrap().len());
4b01c983
DM
755 }
756
882594c5 757 /// Log a message.
479f6e40
DM
758 pub fn log<S: AsRef<str>>(&self, msg: S) {
759 let mut data = self.data.lock().unwrap();
760 data.logger.log(msg);
761 }
762
f6de2c73
DC
763 /// Log a message as warning.
764 pub fn warn<S: AsRef<str>>(&self, msg: S) {
765 let mut data = self.data.lock().unwrap();
766 data.logger.log(format!("WARN: {}", msg.as_ref()));
767 data.warn_count += 1;
768 }
769
882594c5 770 /// Set progress indicator
479f6e40
DM
771 pub fn progress(&self, progress: f64) {
772 if progress >= 0.0 && progress <= 1.0 {
773 let mut data = self.data.lock().unwrap();
774 data.progress = progress;
775 } else {
776 // fixme: log!("task '{}': ignoring strange value for progress '{}'", self.upid, progress);
777 }
778 }
779
882594c5 780 /// Request abort
d607b886 781 pub fn request_abort(&self) {
98a181f0 782 eprintln!("set abort flag for worker {}", self.upid);
479f6e40 783 self.abort_requested.store(true, Ordering::SeqCst);
75bc49be
DM
784 // noitify listeners
785 let mut data = self.data.lock().unwrap();
786 loop {
787 match data.abort_listeners.pop() {
788 None => { break; },
789 Some(ch) => {
790 let _ = ch.send(()); // ignore erros here
791 },
792 }
793 }
479f6e40
DM
794 }
795
882594c5 796 /// Test if abort was requested.
479f6e40
DM
797 pub fn abort_requested(&self) -> bool {
798 self.abort_requested.load(Ordering::SeqCst)
799 }
800
882594c5 801 /// Fail if abort was requested.
479f6e40
DM
802 pub fn fail_on_abort(&self) -> Result<(), Error> {
803 if self.abort_requested() {
99641a6b 804 bail!("abort requested - aborting task");
479f6e40
DM
805 }
806 Ok(())
807 }
75bc49be
DM
808
809 /// Get a future which resolves on task abort
810 pub fn abort_future(&self) -> oneshot::Receiver<()> {
811 let (tx, rx) = oneshot::channel::<()>();
812
813 let mut data = self.data.lock().unwrap();
814 if self.abort_requested() {
815 let _ = tx.send(());
816 } else {
817 data.abort_listeners.push(tx);
818 }
819 rx
820 }
4bd2a9e4
DC
821
822 pub fn upid(&self) -> &UPID {
823 &self.upid
824 }
479f6e40 825}