]> git.proxmox.com Git - proxmox-backup.git/blame - proxmox-rest-server/src/worker_task.rs
update to first proxmox crate split
[proxmox-backup.git] / proxmox-rest-server / src / worker_task.rs
CommitLineData
e7244387 1use std::collections::{HashMap, VecDeque};
4b01c983 2use std::fs::File;
0a33fba4 3use std::path::PathBuf;
5ade6c25 4use std::io::{Read, Write, BufRead, BufReader};
d3f4c08f 5use std::panic::UnwindSafe;
18c0df4c
WB
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::sync::{Arc, Mutex};
de55fff2 8use std::time::{SystemTime, Duration};
d3f4c08f 9
f7d4e4b5 10use anyhow::{bail, format_err, Error};
18c0df4c
WB
11use futures::*;
12use lazy_static::lazy_static;
321070b4 13use serde_json::{json, Value};
4c116baf 14use serde::{Serialize, Deserialize};
18c0df4c 15use tokio::sync::oneshot;
0a33fba4
DM
16use nix::fcntl::OFlag;
17use once_cell::sync::OnceCell;
479f6e40 18
619495b2 19use proxmox::sys::linux::procfs;
0a33fba4 20use proxmox::tools::fs::{create_path, replace_file, atomic_open_or_create_file, CreateOptions};
6ef1b649
WB
21use proxmox_lang::try_block;
22use proxmox_schema::upid::UPID;
e18a6c9e 23
c8449217 24use pbs_tools::task::WorkerTaskContext;
6c76aa43 25use pbs_tools::logrotate::{LogRotate, LogRotateFiles};
b9700a9f 26
49e25688 27use crate::{CommandSocket, FileLogger, FileLogOptions};
6c76aa43 28
0a33fba4 29struct TaskListLockGuard(File);
af06decd 30
0a33fba4
DM
31struct WorkerTaskSetup {
32 file_opts: CreateOptions,
33 taskdir: PathBuf,
34 task_lock_fn: PathBuf,
35 active_tasks_fn: PathBuf,
36 task_index_fn: PathBuf,
37 task_archive_fn: PathBuf,
38}
39
40static WORKER_TASK_SETUP: OnceCell<WorkerTaskSetup> = OnceCell::new();
41
42fn worker_task_setup() -> Result<&'static WorkerTaskSetup, Error> {
43 WORKER_TASK_SETUP.get()
44 .ok_or_else(|| format_err!("WorkerTask library is not initialized"))
45}
46
47impl WorkerTaskSetup {
48
49 fn new(basedir: PathBuf, file_opts: CreateOptions) -> Self {
50
51 let mut taskdir = basedir.clone();
52 taskdir.push("tasks");
53
54 let mut task_lock_fn = taskdir.clone();
55 task_lock_fn.push(".active.lock");
56
57 let mut active_tasks_fn = taskdir.clone();
58 active_tasks_fn.push("active");
59
60 let mut task_index_fn = taskdir.clone();
61 task_index_fn.push("index");
62
63 let mut task_archive_fn = taskdir.clone();
64 task_archive_fn.push("archive");
65
66 Self {
67 file_opts,
68 taskdir,
69 task_lock_fn,
70 active_tasks_fn,
71 task_index_fn,
72 task_archive_fn,
73 }
74 }
75
76 fn lock_task_list_files(&self, exclusive: bool) -> Result<TaskListLockGuard, Error> {
77 let options = self.file_opts.clone()
78 .perm(nix::sys::stat::Mode::from_bits_truncate(0o660));
79
80 let timeout = std::time::Duration::new(10, 0);
81
82 let file = proxmox::tools::fs::open_file_locked(
83 &self.task_lock_fn,
84 timeout,
85 exclusive,
86 options,
87 )?;
88
89 Ok(TaskListLockGuard(file))
90 }
91
92 fn log_path(&self, upid: &UPID) -> std::path::PathBuf {
93 let mut path = self.taskdir.clone();
94 path.push(format!("{:02X}", upid.pstart % 256));
95 path.push(upid.to_string());
96 path
97 }
98
99 // atomically read/update the task list, update status of finished tasks
100 // new_upid is added to the list when specified.
101 fn update_active_workers(&self, new_upid: Option<&UPID>) -> Result<(), Error> {
102
103 let lock = self.lock_task_list_files(true)?;
104
105 // TODO remove with 1.x
106 let mut finish_list: Vec<TaskListInfo> = read_task_file_from_path(&self.task_index_fn)?;
107 let had_index_file = !finish_list.is_empty();
108
109 // We use filter_map because one negative case wants to *move* the data into `finish_list`,
110 // clippy doesn't quite catch this!
111 #[allow(clippy::unnecessary_filter_map)]
112 let mut active_list: Vec<TaskListInfo> = read_task_file_from_path(&self.active_tasks_fn)?
113 .into_iter()
114 .filter_map(|info| {
115 if info.state.is_some() {
116 // this can happen when the active file still includes finished tasks
117 finish_list.push(info);
118 return None;
119 }
120
121 if !worker_is_active_local(&info.upid) {
122 // println!("Detected stopped task '{}'", &info.upid_str);
6ef1b649 123 let now = proxmox_time::epoch_i64();
0a33fba4
DM
124 let status = upid_read_status(&info.upid).unwrap_or(TaskState::Unknown { endtime: now });
125 finish_list.push(TaskListInfo {
126 upid: info.upid,
127 upid_str: info.upid_str,
128 state: Some(status)
129 });
130 return None;
131 }
132
133 Some(info)
134 }).collect();
135
136 if let Some(upid) = new_upid {
137 active_list.push(TaskListInfo { upid: upid.clone(), upid_str: upid.to_string(), state: None });
138 }
139
140 let active_raw = render_task_list(&active_list);
141
142 let options = self.file_opts.clone()
143 .perm(nix::sys::stat::Mode::from_bits_truncate(0o660));
144
145 replace_file(
146 &self.active_tasks_fn,
147 active_raw.as_bytes(),
148 options,
149 )?;
150
151 finish_list.sort_unstable_by(|a, b| {
152 match (&a.state, &b.state) {
153 (Some(s1), Some(s2)) => s1.cmp(&s2),
154 (Some(_), None) => std::cmp::Ordering::Less,
155 (None, Some(_)) => std::cmp::Ordering::Greater,
156 _ => a.upid.starttime.cmp(&b.upid.starttime),
157 }
158 });
159
160 if !finish_list.is_empty() {
161 let options = self.file_opts.clone()
162 .perm(nix::sys::stat::Mode::from_bits_truncate(0o660));
163
164 let mut writer = atomic_open_or_create_file(
165 &self.task_archive_fn,
166 OFlag::O_APPEND | OFlag::O_RDWR,
167 &[],
168 options,
169 )?;
170 for info in &finish_list {
171 writer.write_all(render_task_line(&info).as_bytes())?;
172 }
173 }
174
175 // TODO Remove with 1.x
176 // for compatibility, if we had an INDEX file, we do not need it anymore
177 if had_index_file {
178 let _ = nix::unistd::unlink(&self.task_index_fn);
179 }
180
181 drop(lock);
182
183 Ok(())
184 }
185
186 // Create task log directory with correct permissions
187 fn create_task_log_dirs(&self) -> Result<(), Error> {
188
189 try_block!({
190 let dir_opts = self.file_opts.clone()
191 .perm(nix::sys::stat::Mode::from_bits_truncate(0o755));
192
193 create_path(&self.taskdir, Some(dir_opts.clone()), Some(dir_opts.clone()))?;
194 // fixme:??? create_path(pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR, None, Some(opts))?;
195 Ok(())
196 }).map_err(|err: Error| format_err!("unable to create task log dir - {}", err))
197 }
198}
199
200/// Initialize the WorkerTask library
201pub fn init_worker_tasks(basedir: PathBuf, file_opts: CreateOptions) -> Result<(), Error> {
202 let setup = WorkerTaskSetup::new(basedir, file_opts);
203 setup.create_task_log_dirs()?;
204 WORKER_TASK_SETUP.set(setup)
205 .map_err(|_| format_err!("init_worker_tasks failed - already initialized"))
206}
207
208/// checks if the Task Archive is bigger that 'size_threshold' bytes, and
209/// rotates it if it is
210pub fn rotate_task_log_archive(size_threshold: u64, compress: bool, max_files: Option<usize>) -> Result<bool, Error> {
211
212 let setup = worker_task_setup()?;
213
214 let _lock = setup.lock_task_list_files(true)?;
215
216 let mut logrotate = LogRotate::new(&setup.task_archive_fn, compress)
217 .ok_or_else(|| format_err!("could not get archive file names"))?;
218
219 logrotate.rotate(size_threshold, None, max_files)
220}
221
de55fff2
DC
222/// removes all task logs that are older than the oldest task entry in the
223/// task archive
224pub fn cleanup_old_tasks(compressed: bool) -> Result<(), Error> {
225 let setup = worker_task_setup()?;
226
227 let _lock = setup.lock_task_list_files(true)?;
228
229 let logrotate = LogRotate::new(&setup.task_archive_fn, compressed)
230 .ok_or_else(|| format_err!("could not get archive file names"))?;
231
232 let mut timestamp = None;
233 if let Some(last_file) = logrotate.files().last() {
234 let reader = BufReader::new(last_file);
235 for line in reader.lines() {
236 let line = line?;
237 if let Ok((_, _, Some(state))) = parse_worker_status_line(&line) {
238 timestamp = Some(state.endtime());
239 break;
240 }
241 }
242 }
243
244 fn get_modified(entry: std::fs::DirEntry) -> Result<SystemTime, std::io::Error> {
245 entry.metadata()?.modified()
246 }
247
248 if let Some(timestamp) = timestamp {
249 let cutoff_time = if timestamp > 0 {
250 SystemTime::UNIX_EPOCH.checked_add(Duration::from_secs(timestamp as u64))
251 } else {
252 SystemTime::UNIX_EPOCH.checked_sub(Duration::from_secs(-timestamp as u64))
253 }.ok_or_else(|| format_err!("could not calculate cutoff time"))?;
254
255 for i in 0..256 {
256 let mut path = setup.taskdir.clone();
257 path.push(format!("{:02X}", i));
258 for file in std::fs::read_dir(path)? {
259 let file = file?;
260 let path = file.path();
261
262 let modified = get_modified(file)
263 .map_err(|err| format_err!("error getting mtime for {:?}: {}", path, err))?;
264
265 if modified < cutoff_time {
266 match std::fs::remove_file(path) {
267 Ok(()) => {},
268 Err(err) if err.kind() == std::io::ErrorKind::NotFound => {},
269 Err(err) => bail!("could not remove file: {}", err),
270 }
271 }
272 }
273 }
274 }
275
276 Ok(())
277}
278
0a33fba4
DM
279
280/// Path to the worker log file
281pub fn upid_log_path(upid: &UPID) -> Result<std::path::PathBuf, Error> {
282 let setup = worker_task_setup()?;
283 Ok(setup.log_path(upid))
284}
285
286/// Read endtime (time of last log line) and exitstatus from task log file
287/// If there is not a single line with at valid datetime, we assume the
288/// starttime to be the endtime
289pub fn upid_read_status(upid: &UPID) -> Result<TaskState, Error> {
290
291 let setup = worker_task_setup()?;
292
293 let mut status = TaskState::Unknown { endtime: upid.starttime };
294
295 let path = setup.log_path(upid);
296
297 let mut file = File::open(path)?;
298
299 /// speedup - only read tail
300 use std::io::Seek;
301 use std::io::SeekFrom;
302 let _ = file.seek(SeekFrom::End(-8192)); // ignore errors
303
304 let mut data = Vec::with_capacity(8192);
305 file.read_to_end(&mut data)?;
306
307 // strip newlines at the end of the task logs
308 while data.last() == Some(&b'\n') {
309 data.pop();
310 }
311
312 let last_line = match data.iter().rposition(|c| *c == b'\n') {
313 Some(start) if data.len() > (start+1) => &data[start+1..],
314 Some(_) => &data, // should not happen, since we removed all trailing newlines
315 None => &data,
316 };
317
318 let last_line = std::str::from_utf8(last_line)
319 .map_err(|err| format_err!("upid_read_status: utf8 parse failed: {}", err))?;
320
321 let mut iter = last_line.splitn(2, ": ");
322 if let Some(time_str) = iter.next() {
6ef1b649 323 if let Ok(endtime) = proxmox_time::parse_rfc3339(time_str) {
0a33fba4
DM
324 // set the endtime even if we cannot parse the state
325 status = TaskState::Unknown { endtime };
326 if let Some(rest) = iter.next().and_then(|rest| rest.strip_prefix("TASK ")) {
327 if let Ok(state) = TaskState::from_endtime_and_message(endtime, rest) {
328 status = state;
329 }
330 }
331 }
332 }
333
334 Ok(status)
346a488e 335}
784fa1c2 336
479f6e40
DM
337lazy_static! {
338 static ref WORKER_TASK_LIST: Mutex<HashMap<usize, Arc<WorkerTask>>> = Mutex::new(HashMap::new());
a68768cf 339}
d607b886 340
a68768cf
TL
341/// checks if the task UPID refers to a worker from this process
342fn is_local_worker(upid: &UPID) -> bool {
b9700a9f 343 upid.pid == crate::pid() && upid.pstart == crate::pstart()
479f6e40
DM
344}
345
634132fe 346/// Test if the task is still running
5751e495 347pub async fn worker_is_active(upid: &UPID) -> Result<bool, Error> {
a68768cf 348 if is_local_worker(upid) {
5751e495
DM
349 return Ok(WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id));
350 }
351
3984a5fd 352 if procfs::check_process_running_pstart(upid.pid, upid.pstart).is_none() {
5751e495
DM
353 return Ok(false);
354 }
355
b9700a9f 356 let sock = crate::ctrl_sock_from_pid(upid.pid);
5751e495 357 let cmd = json!({
a68768cf 358 "command": "worker-task-status",
385681c9
TL
359 "args": {
360 "upid": upid.to_string(),
361 },
5751e495 362 });
b9700a9f 363 let status = crate::send_command(sock, &cmd).await?;
4494d078 364
5751e495
DM
365 if let Some(active) = status.as_bool() {
366 Ok(active)
367 } else {
368 bail!("got unexpected result {:?} (expected bool)", status);
369 }
370}
371
372/// Test if the task is still running (fast but inaccurate implementation)
373///
a68768cf 374/// If the task is spawned from a different process, we simply return if
5751e495
DM
375/// that process is still running. This information is good enough to detect
376/// stale tasks...
77ebbefc 377pub fn worker_is_active_local(upid: &UPID) -> bool {
a68768cf 378 if is_local_worker(upid) {
62ee2eb4 379 WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id)
634132fe 380 } else {
62ee2eb4 381 procfs::check_process_running_pstart(upid.pid, upid.pstart).is_some()
479f6e40
DM
382 }
383}
384
49e25688 385/// Register task control command on a [CommandSocket].
2e44983a
DM
386///
387/// This create two commands:
388///
389/// * ``worker-task-abort <UPID>``: calls [abort_local_worker]
390///
391/// * ``worker-task-status <UPID>``: return true of false, depending on
392/// whether the worker is running or stopped.
a68768cf 393pub fn register_task_control_commands(
49e25688 394 commando_sock: &mut CommandSocket,
a68768cf
TL
395) -> Result<(), Error> {
396 fn get_upid(args: Option<&Value>) -> Result<UPID, Error> {
397 let args = if let Some(args) = args { args } else { bail!("missing args") };
398 let upid = match args.get("upid") {
399 Some(Value::String(upid)) => upid.parse::<UPID>()?,
400 None => bail!("no upid in args"),
401 _ => bail!("unable to parse upid"),
402 };
403 if !is_local_worker(&upid) {
d607b886
DM
404 bail!("upid does not belong to this process");
405 }
a68768cf
TL
406 Ok(upid)
407 }
d607b886 408
a68768cf
TL
409 commando_sock.register_command("worker-task-abort".into(), move |args| {
410 let upid = get_upid(args)?;
411
3f742f95
DC
412 abort_local_worker(upid);
413
a68768cf 414 Ok(Value::Null)
d607b886 415 })?;
a68768cf
TL
416 commando_sock.register_command("worker-task-status".into(), move |args| {
417 let upid = get_upid(args)?;
d607b886 418
a68768cf
TL
419 let active = WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id);
420
421 Ok(active.into())
422 })?;
d607b886
DM
423
424 Ok(())
425}
426
2e44983a
DM
427/// Try to abort a worker task, but do no wait
428///
429/// Errors (if any) are simply logged.
430pub fn abort_worker_nowait(upid: UPID) {
75fef4b4
WB
431 tokio::spawn(async move {
432 if let Err(err) = abort_worker(upid).await {
2e44983a 433 log::error!("abort worker task failed - {}", err);
321070b4 434 }
75fef4b4 435 });
321070b4
DM
436}
437
2e44983a
DM
438/// Abort a worker task
439///
440/// By sending ``worker-task-abort`` to the control socket.
5751e495 441pub async fn abort_worker(upid: UPID) -> Result<(), Error> {
321070b4 442
b9700a9f 443 let sock = crate::ctrl_sock_from_pid(upid.pid);
321070b4 444 let cmd = json!({
a68768cf 445 "command": "worker-task-abort",
385681c9
TL
446 "args": {
447 "upid": upid.to_string(),
448 },
321070b4 449 });
b9700a9f 450 crate::send_command(sock, &cmd).map_ok(|_| ()).await
321070b4
DM
451}
452
77bd2a46 453fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<TaskState>), Error> {
4b01c983
DM
454
455 let data = line.splitn(3, ' ').collect::<Vec<&str>>();
456
457 let len = data.len();
458
459 match len {
460 1 => Ok((data[0].to_owned(), data[0].parse::<UPID>()?, None)),
461 3 => {
462 let endtime = i64::from_str_radix(data[1], 16)?;
77bd2a46
DC
463 let state = TaskState::from_endtime_and_message(endtime, data[2])?;
464 Ok((data[0].to_owned(), data[0].parse::<UPID>()?, Some(state)))
4b01c983
DM
465 }
466 _ => bail!("wrong number of components"),
467 }
468}
469
4c116baf 470/// Task State
77bd2a46 471#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
4c116baf
DC
472pub enum TaskState {
473 /// The Task ended with an undefined state
77bd2a46 474 Unknown { endtime: i64 },
4c116baf 475 /// The Task ended and there were no errors or warnings
77bd2a46 476 OK { endtime: i64 },
4c116baf 477 /// The Task had 'count' amount of warnings and no errors
77bd2a46 478 Warning { count: u64, endtime: i64 },
4c116baf 479 /// The Task ended with the error described in 'message'
77bd2a46 480 Error { message: String, endtime: i64 },
4c116baf
DC
481}
482
483impl TaskState {
77bd2a46
DC
484 pub fn endtime(&self) -> i64 {
485 match *self {
486 TaskState::Unknown { endtime } => endtime,
487 TaskState::OK { endtime } => endtime,
488 TaskState::Warning { endtime, .. } => endtime,
489 TaskState::Error { endtime, .. } => endtime,
4c116baf
DC
490 }
491 }
4c116baf 492
77bd2a46 493 fn result_text(&self) -> String {
4c116baf 494 match self {
77bd2a46
DC
495 TaskState::Error { message, .. } => format!("TASK ERROR: {}", message),
496 other => format!("TASK {}", other),
4c116baf
DC
497 }
498 }
4c116baf 499
77bd2a46 500 fn from_endtime_and_message(endtime: i64, s: &str) -> Result<Self, Error> {
4c116baf 501 if s == "unknown" {
77bd2a46 502 Ok(TaskState::Unknown { endtime })
4c116baf 503 } else if s == "OK" {
77bd2a46 504 Ok(TaskState::OK { endtime })
365915da
FG
505 } else if let Some(warnings) = s.strip_prefix("WARNINGS: ") {
506 let count: u64 = warnings.parse()?;
77bd2a46 507 Ok(TaskState::Warning{ count, endtime })
3984a5fd 508 } else if !s.is_empty() {
365915da 509 let message = if let Some(err) = s.strip_prefix("ERROR: ") { err } else { s }.to_string();
77bd2a46 510 Ok(TaskState::Error{ message, endtime })
4c116baf
DC
511 } else {
512 bail!("unable to parse Task Status '{}'", s);
513 }
514 }
515}
516
77bd2a46
DC
517impl std::cmp::PartialOrd for TaskState {
518 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
519 Some(self.endtime().cmp(&other.endtime()))
520 }
521}
522
523impl std::cmp::Ord for TaskState {
524 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
525 self.endtime().cmp(&other.endtime())
526 }
527}
528
529impl std::fmt::Display for TaskState {
530 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
531 match self {
532 TaskState::Unknown { .. } => write!(f, "unknown"),
533 TaskState::OK { .. }=> write!(f, "OK"),
534 TaskState::Warning { count, .. } => write!(f, "WARNINGS: {}", count),
535 TaskState::Error { message, .. } => write!(f, "{}", message),
536 }
537 }
538}
539
93aebb38
DM
540/// Task details including parsed UPID
541///
542/// If there is no `state`, the task is still running.
543#[derive(Debug)]
544pub struct TaskListInfo {
545 /// The parsed UPID
546 pub upid: UPID,
547 /// UPID string representation
548 pub upid_str: String,
549 /// Task `(endtime, status)` if already finished
77bd2a46 550 pub state: Option<TaskState>, // endtime, status
93aebb38
DM
551}
552
bbeb0256
DC
553fn render_task_line(info: &TaskListInfo) -> String {
554 let mut raw = String::new();
555 if let Some(status) = &info.state {
556 raw.push_str(&format!("{} {:08X} {}\n", info.upid_str, status.endtime(), status));
557 } else {
558 raw.push_str(&info.upid_str);
559 raw.push('\n');
560 }
561
562 raw
563}
564
565fn render_task_list(list: &[TaskListInfo]) -> String {
566 let mut raw = String::new();
567 for info in list {
568 raw.push_str(&render_task_line(&info));
569 }
570 raw
571}
572
784fa1c2
DC
573// note this is not locked, caller has to make sure it is
574// this will skip (and log) lines that are not valid status lines
575fn read_task_file<R: Read>(reader: R) -> Result<Vec<TaskListInfo>, Error>
576{
577 let reader = BufReader::new(reader);
578 let mut list = Vec::new();
579 for line in reader.lines() {
580 let line = line?;
581 match parse_worker_status_line(&line) {
582 Ok((upid_str, upid, state)) => list.push(TaskListInfo {
583 upid_str,
584 upid,
585 state
586 }),
587 Err(err) => {
2e44983a 588 log::warn!("unable to parse worker status '{}' - {}", line, err);
784fa1c2
DC
589 continue;
590 }
591 };
592 }
593
594 Ok(list)
595}
596
597// note this is not locked, caller has to make sure it is
598fn read_task_file_from_path<P>(path: P) -> Result<Vec<TaskListInfo>, Error>
599where
600 P: AsRef<std::path::Path> + std::fmt::Debug,
601{
602 let file = match File::open(&path) {
603 Ok(f) => f,
604 Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
605 Err(err) => bail!("unable to open task list {:?} - {}", path, err),
606 };
607
608 read_task_file(file)
609}
610
2e44983a 611/// Iterate over existing/active worker tasks
e7244387
DC
612pub struct TaskListInfoIterator {
613 list: VecDeque<TaskListInfo>,
264779e7 614 end: bool,
e7244387 615 archive: Option<LogRotateFiles>,
0a33fba4 616 lock: Option<TaskListLockGuard>,
e7244387
DC
617}
618
619impl TaskListInfoIterator {
2e44983a 620 /// Creates a new iterator instance.
e7244387 621 pub fn new(active_only: bool) -> Result<Self, Error> {
0a33fba4
DM
622
623 let setup = worker_task_setup()?;
624
e7244387 625 let (read_lock, active_list) = {
0a33fba4
DM
626 let lock = setup.lock_task_list_files(false)?;
627 let active_list = read_task_file_from_path(&setup.active_tasks_fn)?;
e7244387
DC
628
629 let needs_update = active_list
630 .iter()
df4827f2 631 .any(|info| info.state.is_some() || !worker_is_active_local(&info.upid));
e7244387 632
264779e7 633 // TODO remove with 1.x
0a33fba4 634 let index_exists = setup.task_index_fn.is_file();
264779e7
DC
635
636 if needs_update || index_exists {
e7244387 637 drop(lock);
0a33fba4
DM
638 setup.update_active_workers(None)?;
639 let lock = setup.lock_task_list_files(false)?;
640 let active_list = read_task_file_from_path(&setup.active_tasks_fn)?;
e7244387
DC
641 (lock, active_list)
642 } else {
643 (lock, active_list)
644 }
645 };
646
647 let archive = if active_only {
648 None
649 } else {
0a33fba4 650 let logrotate = LogRotate::new(&setup.task_archive_fn, true)
e4f5f59e 651 .ok_or_else(|| format_err!("could not get archive file names"))?;
e7244387
DC
652 Some(logrotate.files())
653 };
654
e7244387
DC
655 let lock = if active_only { None } else { Some(read_lock) };
656
657 Ok(Self {
658 list: active_list.into(),
264779e7 659 end: active_only,
e7244387
DC
660 archive,
661 lock,
662 })
663 }
664}
665
666impl Iterator for TaskListInfoIterator {
667 type Item = Result<TaskListInfo, Error>;
668
669 fn next(&mut self) -> Option<Self::Item> {
670 loop {
671 if let Some(element) = self.list.pop_back() {
672 return Some(Ok(element));
264779e7
DC
673 } else if self.end {
674 return None;
e7244387 675 } else {
264779e7
DC
676 if let Some(mut archive) = self.archive.take() {
677 if let Some(file) = archive.next() {
678 let list = match read_task_file(file) {
679 Ok(list) => list,
e7244387
DC
680 Err(err) => return Some(Err(err)),
681 };
264779e7
DC
682 self.list.append(&mut list.into());
683 self.archive = Some(archive);
684 continue;
e7244387 685 }
e7244387 686 }
264779e7
DC
687
688 self.end = true;
689 self.lock.take();
e7244387
DC
690 }
691 }
692 }
693}
694
882594c5
DM
695/// Launch long running worker tasks.
696///
697/// A worker task can either be a whole thread, or a simply tokio
698/// task/future. Each task can `log()` messages, which are stored
699/// persistently to files. Task should poll the `abort_requested`
700/// flag, and stop execution when requested.
479f6e40 701pub struct WorkerTask {
0a33fba4 702 setup: &'static WorkerTaskSetup,
479f6e40
DM
703 upid: UPID,
704 data: Mutex<WorkerTaskData>,
705 abort_requested: AtomicBool,
706}
707
708impl std::fmt::Display for WorkerTask {
709
710 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
711 self.upid.fmt(f)
712 }
713}
714
479f6e40
DM
715struct WorkerTaskData {
716 logger: FileLogger,
717 progress: f64, // 0..1
f6de2c73 718 warn_count: u64,
75bc49be 719 pub abort_listeners: Vec<oneshot::Sender<()>>,
479f6e40
DM
720}
721
479f6e40
DM
722impl WorkerTask {
723
0a33fba4
DM
724 pub fn new(
725 worker_type: &str,
726 worker_id: Option<String>,
727 auth_id: String,
728 to_stdout: bool,
729 ) -> Result<Arc<Self>, Error> {
730
731 let setup = worker_task_setup()?;
732
e6dc35ac 733 let upid = UPID::new(worker_type, worker_id, auth_id)?;
634132fe 734 let task_id = upid.task_id;
479f6e40 735
0a33fba4 736 let mut path = setup.taskdir.clone();
35950380 737
7f3d9100 738 path.push(format!("{:02X}", upid.pstart & 255));
479f6e40 739
0a33fba4
DM
740 let dir_opts = setup.file_opts.clone()
741 .perm(nix::sys::stat::Mode::from_bits_truncate(0o755));
35950380 742
0a33fba4 743 create_path(&path, None, Some(dir_opts))?;
479f6e40
DM
744
745 path.push(upid.to_string());
746
c0df91f8 747 let logger_options = FileLogOptions {
913dddea 748 to_stdout,
c0df91f8 749 exclusive: true,
e5adbc34 750 prefix_time: true,
c0df91f8 751 read: true,
0a33fba4 752 file_opts: setup.file_opts.clone(),
c0df91f8
TL
753 ..Default::default()
754 };
755 let logger = FileLogger::new(&path, logger_options)?;
479f6e40
DM
756
757 let worker = Arc::new(Self {
0a33fba4 758 setup,
05d755b2 759 upid: upid.clone(),
479f6e40
DM
760 abort_requested: AtomicBool::new(false),
761 data: Mutex::new(WorkerTaskData {
762 logger,
763 progress: 0.0,
f6de2c73 764 warn_count: 0,
75bc49be 765 abort_listeners: vec![],
479f6e40
DM
766 }),
767 });
768
05d755b2
DC
769 // scope to drop the lock again after inserting
770 {
771 let mut hash = WORKER_TASK_LIST.lock().unwrap();
772 hash.insert(task_id, worker.clone());
b9700a9f 773 crate::set_worker_count(hash.len());
05d755b2 774 }
7a630df7 775
0a33fba4 776 setup.update_active_workers(Some(&upid))?;
479f6e40
DM
777
778 Ok(worker)
779 }
780
882594c5 781 /// Spawn a new tokio task/future.
660c6846
DM
782 pub fn spawn<F, T>(
783 worker_type: &str,
784 worker_id: Option<String>,
049a22a3 785 auth_id: String,
660c6846
DM
786 to_stdout: bool,
787 f: F,
788 ) -> Result<String, Error>
479f6e40 789 where F: Send + 'static + FnOnce(Arc<WorkerTask>) -> T,
75fef4b4 790 T: Send + 'static + Future<Output = Result<(), Error>>,
479f6e40 791 {
e6dc35ac 792 let worker = WorkerTask::new(worker_type, worker_id, auth_id, to_stdout)?;
660c6846 793 let upid_str = worker.upid.to_string();
75fef4b4
WB
794 let f = f(worker.clone());
795 tokio::spawn(async move {
796 let result = f.await;
dd8e744f 797 worker.log_result(&result);
75fef4b4 798 });
479f6e40 799
660c6846 800 Ok(upid_str)
479f6e40
DM
801 }
802
882594c5 803 /// Create a new worker thread.
660c6846
DM
804 pub fn new_thread<F>(
805 worker_type: &str,
806 worker_id: Option<String>,
049a22a3 807 auth_id: String,
660c6846
DM
808 to_stdout: bool,
809 f: F,
810 ) -> Result<String, Error>
d3f4c08f 811 where F: Send + UnwindSafe + 'static + FnOnce(Arc<WorkerTask>) -> Result<(), Error>
479f6e40 812 {
e6dc35ac 813 let worker = WorkerTask::new(worker_type, worker_id, auth_id, to_stdout)?;
660c6846 814 let upid_str = worker.upid.to_string();
479f6e40 815
217170e1 816 let _child = std::thread::Builder::new().name(upid_str.clone()).spawn(move || {
d3f4c08f
DM
817 let worker1 = worker.clone();
818 let result = match std::panic::catch_unwind(move || f(worker1)) {
819 Ok(r) => r,
820 Err(panic) => {
821 match panic.downcast::<&str>() {
822 Ok(panic_msg) => {
823 Err(format_err!("worker panicked: {}", panic_msg))
824 }
825 Err(_) => {
826 Err(format_err!("worker panicked: unknown type."))
827 }
828 }
829 }
830 };
831
dd8e744f 832 worker.log_result(&result);
479f6e40
DM
833 });
834
660c6846 835 Ok(upid_str)
479f6e40
DM
836 }
837
4c116baf
DC
838 /// create state from self and a result
839 pub fn create_state(&self, result: &Result<(), Error>) -> TaskState {
f6de2c73 840 let warn_count = self.data.lock().unwrap().warn_count;
cef03f41 841
6ef1b649 842 let endtime = proxmox_time::epoch_i64();
77bd2a46 843
4b01c983 844 if let Err(err) = result {
77bd2a46 845 TaskState::Error { message: err.to_string(), endtime }
f6de2c73 846 } else if warn_count > 0 {
77bd2a46 847 TaskState::Warning { count: warn_count, endtime }
4b01c983 848 } else {
77bd2a46 849 TaskState::OK { endtime }
4b01c983 850 }
cef03f41
DC
851 }
852
853 /// Log task result, remove task from running list
854 pub fn log_result(&self, result: &Result<(), Error>) {
4c116baf 855 let state = self.create_state(result);
1ec0d70d 856 self.log_message(state.result_text());
418def7a
DM
857
858 WORKER_TASK_LIST.lock().unwrap().remove(&self.upid.task_id);
0a33fba4 859 let _ = self.setup.update_active_workers(None);
b9700a9f 860 crate::set_worker_count(WORKER_TASK_LIST.lock().unwrap().len());
4b01c983
DM
861 }
862
882594c5 863 /// Log a message.
1ec0d70d 864 pub fn log_message<S: AsRef<str>>(&self, msg: S) {
479f6e40
DM
865 let mut data = self.data.lock().unwrap();
866 data.logger.log(msg);
867 }
868
f6de2c73 869 /// Log a message as warning.
1ec0d70d 870 pub fn log_warning<S: AsRef<str>>(&self, msg: S) {
f6de2c73
DC
871 let mut data = self.data.lock().unwrap();
872 data.logger.log(format!("WARN: {}", msg.as_ref()));
873 data.warn_count += 1;
874 }
875
882594c5 876 /// Set progress indicator
479f6e40
DM
877 pub fn progress(&self, progress: f64) {
878 if progress >= 0.0 && progress <= 1.0 {
879 let mut data = self.data.lock().unwrap();
880 data.progress = progress;
881 } else {
882 // fixme: log!("task '{}': ignoring strange value for progress '{}'", self.upid, progress);
883 }
884 }
885
882594c5 886 /// Request abort
d607b886 887 pub fn request_abort(&self) {
a6c16894
DM
888 let prev_abort = self.abort_requested.swap(true, Ordering::SeqCst);
889 if !prev_abort { // log abort one time
1ec0d70d 890 self.log_message(format!("received abort request ..."));
a6c16894 891 }
75bc49be
DM
892 // noitify listeners
893 let mut data = self.data.lock().unwrap();
894 loop {
895 match data.abort_listeners.pop() {
896 None => { break; },
897 Some(ch) => {
d1d74c43 898 let _ = ch.send(()); // ignore errors here
75bc49be
DM
899 },
900 }
901 }
479f6e40
DM
902 }
903
75bc49be
DM
904 /// Get a future which resolves on task abort
905 pub fn abort_future(&self) -> oneshot::Receiver<()> {
906 let (tx, rx) = oneshot::channel::<()>();
907
908 let mut data = self.data.lock().unwrap();
909 if self.abort_requested() {
910 let _ = tx.send(());
911 } else {
912 data.abort_listeners.push(tx);
913 }
914 rx
915 }
4bd2a9e4
DC
916
917 pub fn upid(&self) -> &UPID {
918 &self.upid
919 }
479f6e40 920}
d1993187 921
c8449217 922impl WorkerTaskContext for WorkerTask {
619cd5cb
DM
923
924 fn abort_requested(&self) -> bool {
925 self.abort_requested.load(Ordering::SeqCst)
d1993187
WB
926 }
927
0fd55b08
DM
928 fn shutdown_requested(&self) -> bool {
929 crate::shutdown_requested()
930 }
931
932 fn fail_on_shutdown(&self) -> Result<(), Error> {
933 crate::fail_on_shutdown()
934 }
935
d1993187
WB
936 fn log(&self, level: log::Level, message: &std::fmt::Arguments) {
937 match level {
1ec0d70d
DM
938 log::Level::Error => self.log_warning(&message.to_string()),
939 log::Level::Warn => self.log_warning(&message.to_string()),
940 log::Level::Info => self.log_message(&message.to_string()),
941 log::Level::Debug => self.log_message(&format!("DEBUG: {}", message)),
942 log::Level::Trace => self.log_message(&format!("TRACE: {}", message)),
d1993187
WB
943 }
944 }
945}
72fbe9ff
WB
946
947/// Wait for a locally spanned worker task
948///
949/// Note: local workers should print logs to stdout, so there is no
950/// need to fetch/display logs. We just wait for the worker to finish.
951pub async fn wait_for_local_worker(upid_str: &str) -> Result<(), Error> {
952
953 let upid: UPID = upid_str.parse()?;
954
955 let sleep_duration = core::time::Duration::new(0, 100_000_000);
956
957 loop {
958 if worker_is_active_local(&upid) {
959 tokio::time::sleep(sleep_duration).await;
960 } else {
961 break;
962 }
963 }
964 Ok(())
965}
3f742f95
DC
966
967/// Request abort of a local worker (if existing and running)
968pub fn abort_local_worker(upid: UPID) {
969 if let Some(ref worker) = WORKER_TASK_LIST.lock().unwrap().get(&upid.task_id) {
970 worker.request_abort();
971 }
972}