]> git.proxmox.com Git - proxmox.git/blame - proxmox-rest-server/src/worker_task.rs
rest server: log rotation: fix off-by-one for max_days
[proxmox.git] / proxmox-rest-server / src / worker_task.rs
CommitLineData
e8c124fe
DM
1use std::collections::{HashMap, VecDeque};
2use std::fs::File;
2bda552b 3use std::io::{BufRead, BufReader, Read, Write};
e8c124fe 4use std::panic::UnwindSafe;
2bda552b 5use std::path::PathBuf;
e8c124fe
DM
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::sync::{Arc, Mutex};
2bda552b 8use std::time::{Duration, SystemTime};
e8c124fe
DM
9
10use anyhow::{bail, format_err, Error};
11use futures::*;
12use lazy_static::lazy_static;
e8c124fe
DM
13use nix::fcntl::OFlag;
14use once_cell::sync::OnceCell;
2bda552b
TL
15use serde::{Deserialize, Serialize};
16use serde_json::{json, Value};
17use tokio::sync::oneshot;
e8c124fe 18
09046671
WB
19use proxmox_lang::try_block;
20use proxmox_schema::upid::UPID;
2bda552b
TL
21use proxmox_sys::fs::{atomic_open_or_create_file, create_path, replace_file, CreateOptions};
22use proxmox_sys::linux::procfs;
6c856eed 23use proxmox_sys::task_warn;
e8c124fe 24
ef69d1ae 25use proxmox_sys::logrotate::{LogRotate, LogRotateFiles};
2bda552b 26use proxmox_sys::WorkerTaskContext;
e8c124fe 27
2bda552b 28use crate::{CommandSocket, FileLogOptions, FileLogger};
e8c124fe
DM
29
30struct TaskListLockGuard(File);
31
32struct WorkerTaskSetup {
33 file_opts: CreateOptions,
34 taskdir: PathBuf,
35 task_lock_fn: PathBuf,
36 active_tasks_fn: PathBuf,
37 task_index_fn: PathBuf,
38 task_archive_fn: PathBuf,
39}
40
41static WORKER_TASK_SETUP: OnceCell<WorkerTaskSetup> = OnceCell::new();
42
43fn worker_task_setup() -> Result<&'static WorkerTaskSetup, Error> {
2bda552b
TL
44 WORKER_TASK_SETUP
45 .get()
e8c124fe
DM
46 .ok_or_else(|| format_err!("WorkerTask library is not initialized"))
47}
48
49impl WorkerTaskSetup {
e8c124fe 50 fn new(basedir: PathBuf, file_opts: CreateOptions) -> Self {
6ad9248c 51 let mut taskdir = basedir;
e8c124fe
DM
52 taskdir.push("tasks");
53
54 let mut task_lock_fn = taskdir.clone();
55 task_lock_fn.push(".active.lock");
56
57 let mut active_tasks_fn = taskdir.clone();
58 active_tasks_fn.push("active");
59
60 let mut task_index_fn = taskdir.clone();
61 task_index_fn.push("index");
62
63 let mut task_archive_fn = taskdir.clone();
64 task_archive_fn.push("archive");
65
66 Self {
67 file_opts,
68 taskdir,
69 task_lock_fn,
70 active_tasks_fn,
71 task_index_fn,
72 task_archive_fn,
73 }
74 }
75
76 fn lock_task_list_files(&self, exclusive: bool) -> Result<TaskListLockGuard, Error> {
2bda552b
TL
77 let options = self
78 .file_opts
79 .clone()
e8c124fe
DM
80 .perm(nix::sys::stat::Mode::from_bits_truncate(0o660));
81
82 let timeout = std::time::Duration::new(10, 0);
83
2bda552b
TL
84 let file =
85 proxmox_sys::fs::open_file_locked(&self.task_lock_fn, timeout, exclusive, options)?;
e8c124fe
DM
86
87 Ok(TaskListLockGuard(file))
88 }
89
90 fn log_path(&self, upid: &UPID) -> std::path::PathBuf {
91 let mut path = self.taskdir.clone();
92 path.push(format!("{:02X}", upid.pstart % 256));
93 path.push(upid.to_string());
94 path
95 }
96
97 // atomically read/update the task list, update status of finished tasks
98 // new_upid is added to the list when specified.
99 fn update_active_workers(&self, new_upid: Option<&UPID>) -> Result<(), Error> {
e8c124fe
DM
100 let lock = self.lock_task_list_files(true)?;
101
102 // TODO remove with 1.x
103 let mut finish_list: Vec<TaskListInfo> = read_task_file_from_path(&self.task_index_fn)?;
104 let had_index_file = !finish_list.is_empty();
105
106 // We use filter_map because one negative case wants to *move* the data into `finish_list`,
107 // clippy doesn't quite catch this!
108 #[allow(clippy::unnecessary_filter_map)]
109 let mut active_list: Vec<TaskListInfo> = read_task_file_from_path(&self.active_tasks_fn)?
110 .into_iter()
111 .filter_map(|info| {
112 if info.state.is_some() {
113 // this can happen when the active file still includes finished tasks
114 finish_list.push(info);
115 return None;
116 }
117
118 if !worker_is_active_local(&info.upid) {
119 // println!("Detected stopped task '{}'", &info.upid_str);
09046671 120 let now = proxmox_time::epoch_i64();
2bda552b
TL
121 let status =
122 upid_read_status(&info.upid).unwrap_or(TaskState::Unknown { endtime: now });
e8c124fe
DM
123 finish_list.push(TaskListInfo {
124 upid: info.upid,
125 upid_str: info.upid_str,
2bda552b 126 state: Some(status),
e8c124fe
DM
127 });
128 return None;
129 }
130
131 Some(info)
2bda552b
TL
132 })
133 .collect();
e8c124fe
DM
134
135 if let Some(upid) = new_upid {
2bda552b
TL
136 active_list.push(TaskListInfo {
137 upid: upid.clone(),
138 upid_str: upid.to_string(),
139 state: None,
140 });
e8c124fe
DM
141 }
142
143 let active_raw = render_task_list(&active_list);
144
2bda552b
TL
145 let options = self
146 .file_opts
147 .clone()
e8c124fe
DM
148 .perm(nix::sys::stat::Mode::from_bits_truncate(0o660));
149
2bda552b
TL
150 replace_file(&self.active_tasks_fn, active_raw.as_bytes(), options, false)?;
151
152 finish_list.sort_unstable_by(|a, b| match (&a.state, &b.state) {
153 (Some(s1), Some(s2)) => s1.cmp(s2),
154 (Some(_), None) => std::cmp::Ordering::Less,
155 (None, Some(_)) => std::cmp::Ordering::Greater,
156 _ => a.upid.starttime.cmp(&b.upid.starttime),
e8c124fe
DM
157 });
158
159 if !finish_list.is_empty() {
2bda552b
TL
160 let options = self
161 .file_opts
162 .clone()
e8c124fe
DM
163 .perm(nix::sys::stat::Mode::from_bits_truncate(0o660));
164
165 let mut writer = atomic_open_or_create_file(
166 &self.task_archive_fn,
167 OFlag::O_APPEND | OFlag::O_RDWR,
168 &[],
169 options,
e9bea7b7 170 false,
e8c124fe
DM
171 )?;
172 for info in &finish_list {
647a0db8 173 writer.write_all(render_task_line(info).as_bytes())?;
e8c124fe
DM
174 }
175 }
176
177 // TODO Remove with 1.x
178 // for compatibility, if we had an INDEX file, we do not need it anymore
179 if had_index_file {
180 let _ = nix::unistd::unlink(&self.task_index_fn);
181 }
182
183 drop(lock);
184
185 Ok(())
186 }
187
188 // Create task log directory with correct permissions
189 fn create_task_log_dirs(&self) -> Result<(), Error> {
e8c124fe 190 try_block!({
2bda552b
TL
191 let dir_opts = self
192 .file_opts
193 .clone()
e8c124fe
DM
194 .perm(nix::sys::stat::Mode::from_bits_truncate(0o755));
195
6ad9248c 196 create_path(&self.taskdir, Some(dir_opts.clone()), Some(dir_opts))?;
e8c124fe
DM
197 // fixme:??? create_path(pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR, None, Some(opts))?;
198 Ok(())
2bda552b
TL
199 })
200 .map_err(|err: Error| format_err!("unable to create task log dir - {}", err))
e8c124fe
DM
201 }
202}
203
204/// Initialize the WorkerTask library
205pub fn init_worker_tasks(basedir: PathBuf, file_opts: CreateOptions) -> Result<(), Error> {
206 let setup = WorkerTaskSetup::new(basedir, file_opts);
207 setup.create_task_log_dirs()?;
2bda552b
TL
208 WORKER_TASK_SETUP
209 .set(setup)
e8c124fe
DM
210 .map_err(|_| format_err!("init_worker_tasks failed - already initialized"))
211}
212
213/// checks if the Task Archive is bigger that 'size_threshold' bytes, and
bd8fd62d
DC
214/// rotates it if it is, keeps only up to 'max_files'.
215/// If 'max_days' is given, 'max_files' is ignored, and all archive files
216/// will be deleted where there are only tasks that are older than the given days.
ef69d1ae
DM
217pub fn rotate_task_log_archive(
218 size_threshold: u64,
219 compress: bool,
220 max_files: Option<usize>,
bd8fd62d 221 max_days: Option<usize>,
ef69d1ae
DM
222 options: Option<CreateOptions>,
223) -> Result<bool, Error> {
e8c124fe
DM
224 let setup = worker_task_setup()?;
225
226 let _lock = setup.lock_task_list_files(true)?;
227
bd8fd62d
DC
228 let mut logrotate = LogRotate::new(
229 &setup.task_archive_fn,
230 compress,
231 if max_days.is_none() { max_files } else { None },
232 options,
233 )?;
234
235 let mut rotated = logrotate.rotate(size_threshold)?;
236
237 if let Some(max_days) = max_days {
238 let mut delete = false;
239 let file_names = logrotate.file_names();
240 let mut files = logrotate.files();
5eaab786
TL
241 // task archives have task-logs sorted by endtime, with the oldest at the start of the
242 // file. So, peak into every archive and see if the first listed tasks' endtime would be
243 // cut off. If that's the case we know that the next (older) task archive rotation surely
244 // falls behind the cut-off line. We cannot say the same for the current archive without
245 // checking its last (newest) line, but that's more complex, expensive and rather unlikely.
bd8fd62d
DC
246 for file_name in file_names {
247 if !delete {
248 // we only have to check if we did not start deleting already
249
250 // this is ok because the task log files are locked, so no one
251 // else should modify these
252 let reader = match files.next() {
253 Some(file) => BufReader::new(file),
254 None => {
255 bail!("unexpected error: files do not match file_names");
256 }
257 };
258 if let Some(line) = reader.lines().next() {
259 if let Ok((_, _, Some(state))) = parse_worker_status_line(&line?) {
260 // we approximate here with the days, but should be close enough
261 let cutoff_time =
262 proxmox_time::epoch_i64() - (max_days * 24 * 60 * 60) as i64;
263 if state.endtime() < cutoff_time {
264 // we found the first file that has only older entries, start deleting
265 delete = true;
266 rotated = true;
267 }
268 }
269 }
5eaab786 270 } else {
bd8fd62d
DC
271 if let Err(err) = std::fs::remove_file(&file_name) {
272 log::error!("could not remove {:?}: {}", file_name, err);
273 }
274 }
275 }
276 }
e8c124fe 277
bd8fd62d 278 Ok(rotated)
e8c124fe
DM
279}
280
fb6823b5
DC
281/// removes all task logs that are older than the oldest task entry in the
282/// task archive
6c856eed 283pub fn cleanup_old_tasks(worker: &dyn WorkerTaskContext, compressed: bool) -> Result<(), Error> {
fb6823b5
DC
284 let setup = worker_task_setup()?;
285
286 let _lock = setup.lock_task_list_files(true)?;
287
2bda552b 288 let logrotate = LogRotate::new(&setup.task_archive_fn, compressed, None, None)?;
fb6823b5
DC
289
290 let mut timestamp = None;
291 if let Some(last_file) = logrotate.files().last() {
292 let reader = BufReader::new(last_file);
293 for line in reader.lines() {
294 let line = line?;
295 if let Ok((_, _, Some(state))) = parse_worker_status_line(&line) {
296 timestamp = Some(state.endtime());
297 break;
298 }
299 }
300 }
301
302 fn get_modified(entry: std::fs::DirEntry) -> Result<SystemTime, std::io::Error> {
303 entry.metadata()?.modified()
304 }
305
306 if let Some(timestamp) = timestamp {
307 let cutoff_time = if timestamp > 0 {
308 SystemTime::UNIX_EPOCH.checked_add(Duration::from_secs(timestamp as u64))
309 } else {
310 SystemTime::UNIX_EPOCH.checked_sub(Duration::from_secs(-timestamp as u64))
2bda552b
TL
311 }
312 .ok_or_else(|| format_err!("could not calculate cutoff time"))?;
fb6823b5
DC
313
314 for i in 0..256 {
315 let mut path = setup.taskdir.clone();
316 path.push(format!("{:02X}", i));
6c856eed
DC
317 let files = match std::fs::read_dir(path) {
318 Ok(files) => files,
319 Err(err) if err.kind() == std::io::ErrorKind::NotFound => continue,
320 Err(err) => {
321 task_warn!(worker, "could not check task logs in '{:02X}': {}", i, err);
322 continue;
323 }
324 };
325 for file in files {
326 let file = match file {
327 Ok(file) => file,
328 Err(err) => {
329 task_warn!(
330 worker,
331 "could not check some task log in '{:02X}': {}",
332 i,
333 err
334 );
335 continue;
336 }
337 };
fb6823b5
DC
338 let path = file.path();
339
6c856eed
DC
340 let modified = match get_modified(file) {
341 Ok(modified) => modified,
342 Err(err) => {
343 task_warn!(worker, "error getting mtime for '{:?}': {}", path, err);
344 continue;
345 }
346 };
fb6823b5
DC
347
348 if modified < cutoff_time {
6c856eed 349 match std::fs::remove_file(&path) {
2bda552b
TL
350 Ok(()) => {}
351 Err(err) if err.kind() == std::io::ErrorKind::NotFound => {}
6c856eed
DC
352 Err(err) => {
353 task_warn!(worker, "could not remove file '{:?}': {}", path, err)
354 }
fb6823b5
DC
355 }
356 }
357 }
358 }
359 }
360
361 Ok(())
362}
363
e8c124fe
DM
364/// Path to the worker log file
365pub fn upid_log_path(upid: &UPID) -> Result<std::path::PathBuf, Error> {
366 let setup = worker_task_setup()?;
367 Ok(setup.log_path(upid))
368}
369
370/// Read endtime (time of last log line) and exitstatus from task log file
371/// If there is not a single line with at valid datetime, we assume the
372/// starttime to be the endtime
373pub fn upid_read_status(upid: &UPID) -> Result<TaskState, Error> {
e8c124fe
DM
374 let setup = worker_task_setup()?;
375
2bda552b
TL
376 let mut status = TaskState::Unknown {
377 endtime: upid.starttime,
378 };
e8c124fe
DM
379
380 let path = setup.log_path(upid);
381
382 let mut file = File::open(path)?;
383
384 /// speedup - only read tail
385 use std::io::Seek;
386 use std::io::SeekFrom;
387 let _ = file.seek(SeekFrom::End(-8192)); // ignore errors
388
389 let mut data = Vec::with_capacity(8192);
390 file.read_to_end(&mut data)?;
391
392 // strip newlines at the end of the task logs
393 while data.last() == Some(&b'\n') {
394 data.pop();
395 }
396
397 let last_line = match data.iter().rposition(|c| *c == b'\n') {
2bda552b 398 Some(start) if data.len() > (start + 1) => &data[start + 1..],
e8c124fe
DM
399 Some(_) => &data, // should not happen, since we removed all trailing newlines
400 None => &data,
401 };
402
403 let last_line = std::str::from_utf8(last_line)
404 .map_err(|err| format_err!("upid_read_status: utf8 parse failed: {}", err))?;
405
406 let mut iter = last_line.splitn(2, ": ");
407 if let Some(time_str) = iter.next() {
09046671 408 if let Ok(endtime) = proxmox_time::parse_rfc3339(time_str) {
e8c124fe
DM
409 // set the endtime even if we cannot parse the state
410 status = TaskState::Unknown { endtime };
411 if let Some(rest) = iter.next().and_then(|rest| rest.strip_prefix("TASK ")) {
412 if let Ok(state) = TaskState::from_endtime_and_message(endtime, rest) {
413 status = state;
414 }
415 }
416 }
417 }
418
419 Ok(status)
420}
421
422lazy_static! {
2bda552b
TL
423 static ref WORKER_TASK_LIST: Mutex<HashMap<usize, Arc<WorkerTask>>> =
424 Mutex::new(HashMap::new());
e8c124fe
DM
425}
426
427/// checks if the task UPID refers to a worker from this process
428fn is_local_worker(upid: &UPID) -> bool {
429 upid.pid == crate::pid() && upid.pstart == crate::pstart()
430}
431
432/// Test if the task is still running
433pub async fn worker_is_active(upid: &UPID) -> Result<bool, Error> {
434 if is_local_worker(upid) {
435 return Ok(WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id));
436 }
437
438 if procfs::check_process_running_pstart(upid.pid, upid.pstart).is_none() {
439 return Ok(false);
440 }
441
442 let sock = crate::ctrl_sock_from_pid(upid.pid);
443 let cmd = json!({
444 "command": "worker-task-status",
445 "args": {
446 "upid": upid.to_string(),
447 },
448 });
449 let status = crate::send_command(sock, &cmd).await?;
450
451 if let Some(active) = status.as_bool() {
452 Ok(active)
453 } else {
454 bail!("got unexpected result {:?} (expected bool)", status);
455 }
456}
457
458/// Test if the task is still running (fast but inaccurate implementation)
459///
460/// If the task is spawned from a different process, we simply return if
461/// that process is still running. This information is good enough to detect
462/// stale tasks...
463pub fn worker_is_active_local(upid: &UPID) -> bool {
464 if is_local_worker(upid) {
465 WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id)
466 } else {
467 procfs::check_process_running_pstart(upid.pid, upid.pstart).is_some()
468 }
469}
470
ad449a57 471/// Register task control command on a [CommandSocket].
9cb2c97c
DM
472///
473/// This create two commands:
474///
475/// * ``worker-task-abort <UPID>``: calls [abort_local_worker]
476///
477/// * ``worker-task-status <UPID>``: return true of false, depending on
478/// whether the worker is running or stopped.
2bda552b 479pub fn register_task_control_commands(commando_sock: &mut CommandSocket) -> Result<(), Error> {
e8c124fe 480 fn get_upid(args: Option<&Value>) -> Result<UPID, Error> {
2bda552b
TL
481 let args = if let Some(args) = args {
482 args
483 } else {
484 bail!("missing args")
485 };
e8c124fe
DM
486 let upid = match args.get("upid") {
487 Some(Value::String(upid)) => upid.parse::<UPID>()?,
488 None => bail!("no upid in args"),
489 _ => bail!("unable to parse upid"),
490 };
491 if !is_local_worker(&upid) {
492 bail!("upid does not belong to this process");
493 }
494 Ok(upid)
495 }
496
497 commando_sock.register_command("worker-task-abort".into(), move |args| {
498 let upid = get_upid(args)?;
499
500 abort_local_worker(upid);
501
502 Ok(Value::Null)
503 })?;
504 commando_sock.register_command("worker-task-status".into(), move |args| {
505 let upid = get_upid(args)?;
506
507 let active = WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id);
508
509 Ok(active.into())
510 })?;
511
512 Ok(())
513}
514
9cb2c97c
DM
515/// Try to abort a worker task, but do no wait
516///
517/// Errors (if any) are simply logged.
518pub fn abort_worker_nowait(upid: UPID) {
e8c124fe
DM
519 tokio::spawn(async move {
520 if let Err(err) = abort_worker(upid).await {
9cb2c97c 521 log::error!("abort worker task failed - {}", err);
e8c124fe
DM
522 }
523 });
524}
525
9cb2c97c
DM
526/// Abort a worker task
527///
528/// By sending ``worker-task-abort`` to the control socket.
e8c124fe 529pub async fn abort_worker(upid: UPID) -> Result<(), Error> {
e8c124fe
DM
530 let sock = crate::ctrl_sock_from_pid(upid.pid);
531 let cmd = json!({
532 "command": "worker-task-abort",
533 "args": {
534 "upid": upid.to_string(),
535 },
536 });
537 crate::send_command(sock, &cmd).map_ok(|_| ()).await
538}
539
540fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<TaskState>), Error> {
e8c124fe
DM
541 let data = line.splitn(3, ' ').collect::<Vec<&str>>();
542
543 let len = data.len();
544
545 match len {
546 1 => Ok((data[0].to_owned(), data[0].parse::<UPID>()?, None)),
547 3 => {
548 let endtime = i64::from_str_radix(data[1], 16)?;
549 let state = TaskState::from_endtime_and_message(endtime, data[2])?;
550 Ok((data[0].to_owned(), data[0].parse::<UPID>()?, Some(state)))
551 }
552 _ => bail!("wrong number of components"),
553 }
554}
555
556/// Task State
557#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
558pub enum TaskState {
559 /// The Task ended with an undefined state
560 Unknown { endtime: i64 },
561 /// The Task ended and there were no errors or warnings
562 OK { endtime: i64 },
563 /// The Task had 'count' amount of warnings and no errors
564 Warning { count: u64, endtime: i64 },
565 /// The Task ended with the error described in 'message'
566 Error { message: String, endtime: i64 },
567}
568
569impl TaskState {
570 pub fn endtime(&self) -> i64 {
571 match *self {
572 TaskState::Unknown { endtime } => endtime,
573 TaskState::OK { endtime } => endtime,
574 TaskState::Warning { endtime, .. } => endtime,
575 TaskState::Error { endtime, .. } => endtime,
576 }
577 }
578
579 fn result_text(&self) -> String {
580 match self {
581 TaskState::Error { message, .. } => format!("TASK ERROR: {}", message),
582 other => format!("TASK {}", other),
583 }
584 }
585
586 fn from_endtime_and_message(endtime: i64, s: &str) -> Result<Self, Error> {
587 if s == "unknown" {
588 Ok(TaskState::Unknown { endtime })
589 } else if s == "OK" {
590 Ok(TaskState::OK { endtime })
591 } else if let Some(warnings) = s.strip_prefix("WARNINGS: ") {
592 let count: u64 = warnings.parse()?;
2bda552b 593 Ok(TaskState::Warning { count, endtime })
e8c124fe 594 } else if !s.is_empty() {
2bda552b
TL
595 let message = if let Some(err) = s.strip_prefix("ERROR: ") {
596 err
597 } else {
598 s
599 }
600 .to_string();
601 Ok(TaskState::Error { message, endtime })
e8c124fe
DM
602 } else {
603 bail!("unable to parse Task Status '{}'", s);
604 }
605 }
606}
607
608impl std::cmp::PartialOrd for TaskState {
609 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
610 Some(self.endtime().cmp(&other.endtime()))
611 }
612}
613
614impl std::cmp::Ord for TaskState {
615 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
616 self.endtime().cmp(&other.endtime())
617 }
618}
619
620impl std::fmt::Display for TaskState {
621 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
622 match self {
623 TaskState::Unknown { .. } => write!(f, "unknown"),
2bda552b 624 TaskState::OK { .. } => write!(f, "OK"),
e8c124fe
DM
625 TaskState::Warning { count, .. } => write!(f, "WARNINGS: {}", count),
626 TaskState::Error { message, .. } => write!(f, "{}", message),
627 }
628 }
629}
630
631/// Task details including parsed UPID
632///
633/// If there is no `state`, the task is still running.
634#[derive(Debug)]
635pub struct TaskListInfo {
636 /// The parsed UPID
637 pub upid: UPID,
638 /// UPID string representation
639 pub upid_str: String,
640 /// Task `(endtime, status)` if already finished
641 pub state: Option<TaskState>, // endtime, status
642}
643
644fn render_task_line(info: &TaskListInfo) -> String {
645 let mut raw = String::new();
646 if let Some(status) = &info.state {
2bda552b
TL
647 raw.push_str(&format!(
648 "{} {:08X} {}\n",
649 info.upid_str,
650 status.endtime(),
651 status
652 ));
e8c124fe
DM
653 } else {
654 raw.push_str(&info.upid_str);
655 raw.push('\n');
656 }
657
658 raw
659}
660
661fn render_task_list(list: &[TaskListInfo]) -> String {
662 let mut raw = String::new();
663 for info in list {
647a0db8 664 raw.push_str(&render_task_line(info));
e8c124fe
DM
665 }
666 raw
667}
668
669// note this is not locked, caller has to make sure it is
670// this will skip (and log) lines that are not valid status lines
2bda552b 671fn read_task_file<R: Read>(reader: R) -> Result<Vec<TaskListInfo>, Error> {
e8c124fe
DM
672 let reader = BufReader::new(reader);
673 let mut list = Vec::new();
674 for line in reader.lines() {
675 let line = line?;
676 match parse_worker_status_line(&line) {
677 Ok((upid_str, upid, state)) => list.push(TaskListInfo {
678 upid_str,
679 upid,
2bda552b 680 state,
e8c124fe
DM
681 }),
682 Err(err) => {
9cb2c97c 683 log::warn!("unable to parse worker status '{}' - {}", line, err);
e8c124fe
DM
684 continue;
685 }
686 };
687 }
688
689 Ok(list)
690}
691
692// note this is not locked, caller has to make sure it is
693fn read_task_file_from_path<P>(path: P) -> Result<Vec<TaskListInfo>, Error>
694where
695 P: AsRef<std::path::Path> + std::fmt::Debug,
696{
697 let file = match File::open(&path) {
698 Ok(f) => f,
699 Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
700 Err(err) => bail!("unable to open task list {:?} - {}", path, err),
701 };
702
703 read_task_file(file)
704}
705
9cb2c97c 706/// Iterate over existing/active worker tasks
e8c124fe
DM
707pub struct TaskListInfoIterator {
708 list: VecDeque<TaskListInfo>,
709 end: bool,
710 archive: Option<LogRotateFiles>,
711 lock: Option<TaskListLockGuard>,
712}
713
714impl TaskListInfoIterator {
9cb2c97c 715 /// Creates a new iterator instance.
e8c124fe 716 pub fn new(active_only: bool) -> Result<Self, Error> {
e8c124fe
DM
717 let setup = worker_task_setup()?;
718
719 let (read_lock, active_list) = {
720 let lock = setup.lock_task_list_files(false)?;
721 let active_list = read_task_file_from_path(&setup.active_tasks_fn)?;
722
723 let needs_update = active_list
724 .iter()
725 .any(|info| info.state.is_some() || !worker_is_active_local(&info.upid));
726
727 // TODO remove with 1.x
728 let index_exists = setup.task_index_fn.is_file();
729
730 if needs_update || index_exists {
731 drop(lock);
732 setup.update_active_workers(None)?;
733 let lock = setup.lock_task_list_files(false)?;
734 let active_list = read_task_file_from_path(&setup.active_tasks_fn)?;
735 (lock, active_list)
736 } else {
737 (lock, active_list)
738 }
739 };
740
741 let archive = if active_only {
742 None
743 } else {
ef69d1ae 744 let logrotate = LogRotate::new(&setup.task_archive_fn, true, None, None)?;
e8c124fe
DM
745 Some(logrotate.files())
746 };
747
748 let lock = if active_only { None } else { Some(read_lock) };
749
750 Ok(Self {
751 list: active_list.into(),
752 end: active_only,
753 archive,
754 lock,
755 })
756 }
757}
758
759impl Iterator for TaskListInfoIterator {
760 type Item = Result<TaskListInfo, Error>;
761
762 fn next(&mut self) -> Option<Self::Item> {
763 loop {
764 if let Some(element) = self.list.pop_back() {
765 return Some(Ok(element));
766 } else if self.end {
2bda552b 767 return None;
e8c124fe
DM
768 } else {
769 if let Some(mut archive) = self.archive.take() {
770 if let Some(file) = archive.next() {
771 let list = match read_task_file(file) {
772 Ok(list) => list,
773 Err(err) => return Some(Err(err)),
774 };
775 self.list.append(&mut list.into());
776 self.archive = Some(archive);
777 continue;
778 }
779 }
780
781 self.end = true;
782 self.lock.take();
783 }
784 }
785 }
786}
787
788/// Launch long running worker tasks.
789///
790/// A worker task can either be a whole thread, or a simply tokio
791/// task/future. Each task can `log()` messages, which are stored
792/// persistently to files. Task should poll the `abort_requested`
793/// flag, and stop execution when requested.
794pub struct WorkerTask {
795 setup: &'static WorkerTaskSetup,
796 upid: UPID,
797 data: Mutex<WorkerTaskData>,
798 abort_requested: AtomicBool,
799}
800
801impl std::fmt::Display for WorkerTask {
e8c124fe
DM
802 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
803 self.upid.fmt(f)
804 }
805}
806
807struct WorkerTaskData {
808 logger: FileLogger,
809 progress: f64, // 0..1
810 warn_count: u64,
811 pub abort_listeners: Vec<oneshot::Sender<()>>,
812}
813
814impl WorkerTask {
e8c124fe
DM
815 pub fn new(
816 worker_type: &str,
817 worker_id: Option<String>,
818 auth_id: String,
819 to_stdout: bool,
820 ) -> Result<Arc<Self>, Error> {
e8c124fe
DM
821 let setup = worker_task_setup()?;
822
823 let upid = UPID::new(worker_type, worker_id, auth_id)?;
824 let task_id = upid.task_id;
825
826 let mut path = setup.taskdir.clone();
827
828 path.push(format!("{:02X}", upid.pstart & 255));
829
2bda552b
TL
830 let dir_opts = setup
831 .file_opts
832 .clone()
e8c124fe
DM
833 .perm(nix::sys::stat::Mode::from_bits_truncate(0o755));
834
835 create_path(&path, None, Some(dir_opts))?;
836
837 path.push(upid.to_string());
838
839 let logger_options = FileLogOptions {
840 to_stdout,
841 exclusive: true,
842 prefix_time: true,
843 read: true,
844 file_opts: setup.file_opts.clone(),
845 ..Default::default()
846 };
847 let logger = FileLogger::new(&path, logger_options)?;
848
849 let worker = Arc::new(Self {
850 setup,
851 upid: upid.clone(),
852 abort_requested: AtomicBool::new(false),
853 data: Mutex::new(WorkerTaskData {
854 logger,
855 progress: 0.0,
856 warn_count: 0,
857 abort_listeners: vec![],
858 }),
859 });
860
861 // scope to drop the lock again after inserting
862 {
863 let mut hash = WORKER_TASK_LIST.lock().unwrap();
864 hash.insert(task_id, worker.clone());
865 crate::set_worker_count(hash.len());
866 }
867
868 setup.update_active_workers(Some(&upid))?;
869
870 Ok(worker)
871 }
872
873 /// Spawn a new tokio task/future.
874 pub fn spawn<F, T>(
875 worker_type: &str,
876 worker_id: Option<String>,
877 auth_id: String,
878 to_stdout: bool,
879 f: F,
880 ) -> Result<String, Error>
2bda552b
TL
881 where
882 F: Send + 'static + FnOnce(Arc<WorkerTask>) -> T,
883 T: Send + 'static + Future<Output = Result<(), Error>>,
e8c124fe
DM
884 {
885 let worker = WorkerTask::new(worker_type, worker_id, auth_id, to_stdout)?;
886 let upid_str = worker.upid.to_string();
887 let f = f(worker.clone());
888 tokio::spawn(async move {
889 let result = f.await;
890 worker.log_result(&result);
891 });
892
893 Ok(upid_str)
894 }
895
896 /// Create a new worker thread.
897 pub fn new_thread<F>(
898 worker_type: &str,
899 worker_id: Option<String>,
900 auth_id: String,
901 to_stdout: bool,
902 f: F,
903 ) -> Result<String, Error>
2bda552b
TL
904 where
905 F: Send + UnwindSafe + 'static + FnOnce(Arc<WorkerTask>) -> Result<(), Error>,
e8c124fe
DM
906 {
907 let worker = WorkerTask::new(worker_type, worker_id, auth_id, to_stdout)?;
908 let upid_str = worker.upid.to_string();
909
2bda552b
TL
910 let _child = std::thread::Builder::new()
911 .name(upid_str.clone())
912 .spawn(move || {
913 let worker1 = worker.clone();
914 let result = match std::panic::catch_unwind(move || f(worker1)) {
915 Ok(r) => r,
916 Err(panic) => match panic.downcast::<&str>() {
917 Ok(panic_msg) => Err(format_err!("worker panicked: {}", panic_msg)),
918 Err(_) => Err(format_err!("worker panicked: unknown type.")),
919 },
920 };
921
922 worker.log_result(&result);
923 });
e8c124fe
DM
924
925 Ok(upid_str)
926 }
927
928 /// create state from self and a result
929 pub fn create_state(&self, result: &Result<(), Error>) -> TaskState {
930 let warn_count = self.data.lock().unwrap().warn_count;
931
09046671 932 let endtime = proxmox_time::epoch_i64();
e8c124fe
DM
933
934 if let Err(err) = result {
2bda552b
TL
935 TaskState::Error {
936 message: err.to_string(),
937 endtime,
938 }
e8c124fe 939 } else if warn_count > 0 {
2bda552b
TL
940 TaskState::Warning {
941 count: warn_count,
942 endtime,
943 }
e8c124fe
DM
944 } else {
945 TaskState::OK { endtime }
946 }
947 }
948
949 /// Log task result, remove task from running list
950 pub fn log_result(&self, result: &Result<(), Error>) {
951 let state = self.create_state(result);
020c8e69 952 self.log_message(state.result_text());
e8c124fe
DM
953
954 WORKER_TASK_LIST.lock().unwrap().remove(&self.upid.task_id);
955 let _ = self.setup.update_active_workers(None);
956 crate::set_worker_count(WORKER_TASK_LIST.lock().unwrap().len());
957 }
958
959 /// Log a message.
020c8e69 960 pub fn log_message<S: AsRef<str>>(&self, msg: S) {
e8c124fe
DM
961 let mut data = self.data.lock().unwrap();
962 data.logger.log(msg);
963 }
964
965 /// Log a message as warning.
020c8e69 966 pub fn log_warning<S: AsRef<str>>(&self, msg: S) {
e8c124fe
DM
967 let mut data = self.data.lock().unwrap();
968 data.logger.log(format!("WARN: {}", msg.as_ref()));
969 data.warn_count += 1;
970 }
971
972 /// Set progress indicator
973 pub fn progress(&self, progress: f64) {
974 if progress >= 0.0 && progress <= 1.0 {
975 let mut data = self.data.lock().unwrap();
976 data.progress = progress;
977 } else {
2bda552b 978 // fixme: log!("task '{}': ignoring strange value for progress '{}'", self.upid, progress);
e8c124fe
DM
979 }
980 }
981
982 /// Request abort
983 pub fn request_abort(&self) {
e8c124fe 984 let prev_abort = self.abort_requested.swap(true, Ordering::SeqCst);
2bda552b
TL
985 if !prev_abort {
986 // log abort one time
bb7018e1 987 self.log_message("received abort request ...".to_string());
e8c124fe
DM
988 }
989 // noitify listeners
990 let mut data = self.data.lock().unwrap();
991 loop {
992 match data.abort_listeners.pop() {
2bda552b
TL
993 None => {
994 break;
995 }
e8c124fe
DM
996 Some(ch) => {
997 let _ = ch.send(()); // ignore errors here
2bda552b 998 }
e8c124fe
DM
999 }
1000 }
1001 }
1002
e8c124fe 1003 /// Get a future which resolves on task abort
2bda552b 1004 pub fn abort_future(&self) -> oneshot::Receiver<()> {
e8c124fe
DM
1005 let (tx, rx) = oneshot::channel::<()>();
1006
1007 let mut data = self.data.lock().unwrap();
1008 if self.abort_requested() {
1009 let _ = tx.send(());
1010 } else {
1011 data.abort_listeners.push(tx);
1012 }
1013 rx
1014 }
1015
1016 pub fn upid(&self) -> &UPID {
1017 &self.upid
1018 }
1019}
1020
7a4bb600 1021impl WorkerTaskContext for WorkerTask {
42dae7e1
DM
1022 fn abort_requested(&self) -> bool {
1023 self.abort_requested.load(Ordering::SeqCst)
e8c124fe
DM
1024 }
1025
6e50c7aa
DM
1026 fn shutdown_requested(&self) -> bool {
1027 crate::shutdown_requested()
1028 }
1029
1030 fn fail_on_shutdown(&self) -> Result<(), Error> {
1031 crate::fail_on_shutdown()
1032 }
1033
e8c124fe
DM
1034 fn log(&self, level: log::Level, message: &std::fmt::Arguments) {
1035 match level {
020c8e69
DM
1036 log::Level::Error => self.log_warning(&message.to_string()),
1037 log::Level::Warn => self.log_warning(&message.to_string()),
1038 log::Level::Info => self.log_message(&message.to_string()),
1039 log::Level::Debug => self.log_message(&format!("DEBUG: {}", message)),
1040 log::Level::Trace => self.log_message(&format!("TRACE: {}", message)),
e8c124fe
DM
1041 }
1042 }
1043}
1044
1045/// Wait for a locally spanned worker task
1046///
1047/// Note: local workers should print logs to stdout, so there is no
1048/// need to fetch/display logs. We just wait for the worker to finish.
1049pub async fn wait_for_local_worker(upid_str: &str) -> Result<(), Error> {
e8c124fe
DM
1050 let upid: UPID = upid_str.parse()?;
1051
1052 let sleep_duration = core::time::Duration::new(0, 100_000_000);
1053
1054 loop {
1055 if worker_is_active_local(&upid) {
1056 tokio::time::sleep(sleep_duration).await;
1057 } else {
1058 break;
1059 }
1060 }
1061 Ok(())
1062}
1063
1064/// Request abort of a local worker (if existing and running)
1065pub fn abort_local_worker(upid: UPID) {
647a0db8 1066 if let Some(worker) = WORKER_TASK_LIST.lock().unwrap().get(&upid.task_id) {
e8c124fe
DM
1067 worker.request_abort();
1068 }
1069}