]>
Commit | Line | Data |
---|---|---|
e7244387 | 1 | use std::collections::{HashMap, VecDeque}; |
4b01c983 | 2 | use std::fs::File; |
0a33fba4 | 3 | use std::path::PathBuf; |
5ade6c25 | 4 | use std::io::{Read, Write, BufRead, BufReader}; |
d3f4c08f | 5 | use std::panic::UnwindSafe; |
18c0df4c WB |
6 | use std::sync::atomic::{AtomicBool, Ordering}; |
7 | use std::sync::{Arc, Mutex}; | |
de55fff2 | 8 | use std::time::{SystemTime, Duration}; |
d3f4c08f | 9 | |
f7d4e4b5 | 10 | use anyhow::{bail, format_err, Error}; |
18c0df4c WB |
11 | use futures::*; |
12 | use lazy_static::lazy_static; | |
321070b4 | 13 | use serde_json::{json, Value}; |
4c116baf | 14 | use serde::{Serialize, Deserialize}; |
18c0df4c | 15 | use tokio::sync::oneshot; |
0a33fba4 DM |
16 | use nix::fcntl::OFlag; |
17 | use once_cell::sync::OnceCell; | |
479f6e40 | 18 | |
25877d05 DM |
19 | use proxmox_sys::linux::procfs; |
20 | use proxmox_sys::fs::{create_path, replace_file, atomic_open_or_create_file, CreateOptions}; | |
6ef1b649 WB |
21 | use proxmox_lang::try_block; |
22 | use proxmox_schema::upid::UPID; | |
e18a6c9e | 23 | |
25877d05 | 24 | use proxmox_sys::WorkerTaskContext; |
d5790a9f | 25 | use proxmox_sys::logrotate::{LogRotate, LogRotateFiles}; |
b9700a9f | 26 | |
49e25688 | 27 | use crate::{CommandSocket, FileLogger, FileLogOptions}; |
6c76aa43 | 28 | |
0a33fba4 | 29 | struct TaskListLockGuard(File); |
af06decd | 30 | |
0a33fba4 DM |
31 | struct WorkerTaskSetup { |
32 | file_opts: CreateOptions, | |
33 | taskdir: PathBuf, | |
34 | task_lock_fn: PathBuf, | |
35 | active_tasks_fn: PathBuf, | |
36 | task_index_fn: PathBuf, | |
37 | task_archive_fn: PathBuf, | |
38 | } | |
39 | ||
40 | static WORKER_TASK_SETUP: OnceCell<WorkerTaskSetup> = OnceCell::new(); | |
41 | ||
42 | fn worker_task_setup() -> Result<&'static WorkerTaskSetup, Error> { | |
43 | WORKER_TASK_SETUP.get() | |
44 | .ok_or_else(|| format_err!("WorkerTask library is not initialized")) | |
45 | } | |
46 | ||
47 | impl WorkerTaskSetup { | |
48 | ||
49 | fn new(basedir: PathBuf, file_opts: CreateOptions) -> Self { | |
50 | ||
51 | let mut taskdir = basedir.clone(); | |
52 | taskdir.push("tasks"); | |
53 | ||
54 | let mut task_lock_fn = taskdir.clone(); | |
55 | task_lock_fn.push(".active.lock"); | |
56 | ||
57 | let mut active_tasks_fn = taskdir.clone(); | |
58 | active_tasks_fn.push("active"); | |
59 | ||
60 | let mut task_index_fn = taskdir.clone(); | |
61 | task_index_fn.push("index"); | |
62 | ||
63 | let mut task_archive_fn = taskdir.clone(); | |
64 | task_archive_fn.push("archive"); | |
65 | ||
66 | Self { | |
67 | file_opts, | |
68 | taskdir, | |
69 | task_lock_fn, | |
70 | active_tasks_fn, | |
71 | task_index_fn, | |
72 | task_archive_fn, | |
73 | } | |
74 | } | |
75 | ||
76 | fn lock_task_list_files(&self, exclusive: bool) -> Result<TaskListLockGuard, Error> { | |
77 | let options = self.file_opts.clone() | |
78 | .perm(nix::sys::stat::Mode::from_bits_truncate(0o660)); | |
79 | ||
80 | let timeout = std::time::Duration::new(10, 0); | |
81 | ||
25877d05 | 82 | let file = proxmox_sys::fs::open_file_locked( |
0a33fba4 DM |
83 | &self.task_lock_fn, |
84 | timeout, | |
85 | exclusive, | |
86 | options, | |
87 | )?; | |
88 | ||
89 | Ok(TaskListLockGuard(file)) | |
90 | } | |
91 | ||
92 | fn log_path(&self, upid: &UPID) -> std::path::PathBuf { | |
93 | let mut path = self.taskdir.clone(); | |
94 | path.push(format!("{:02X}", upid.pstart % 256)); | |
95 | path.push(upid.to_string()); | |
96 | path | |
97 | } | |
98 | ||
99 | // atomically read/update the task list, update status of finished tasks | |
100 | // new_upid is added to the list when specified. | |
101 | fn update_active_workers(&self, new_upid: Option<&UPID>) -> Result<(), Error> { | |
102 | ||
103 | let lock = self.lock_task_list_files(true)?; | |
104 | ||
105 | // TODO remove with 1.x | |
106 | let mut finish_list: Vec<TaskListInfo> = read_task_file_from_path(&self.task_index_fn)?; | |
107 | let had_index_file = !finish_list.is_empty(); | |
108 | ||
109 | // We use filter_map because one negative case wants to *move* the data into `finish_list`, | |
110 | // clippy doesn't quite catch this! | |
111 | #[allow(clippy::unnecessary_filter_map)] | |
112 | let mut active_list: Vec<TaskListInfo> = read_task_file_from_path(&self.active_tasks_fn)? | |
113 | .into_iter() | |
114 | .filter_map(|info| { | |
115 | if info.state.is_some() { | |
116 | // this can happen when the active file still includes finished tasks | |
117 | finish_list.push(info); | |
118 | return None; | |
119 | } | |
120 | ||
121 | if !worker_is_active_local(&info.upid) { | |
122 | // println!("Detected stopped task '{}'", &info.upid_str); | |
6ef1b649 | 123 | let now = proxmox_time::epoch_i64(); |
0a33fba4 DM |
124 | let status = upid_read_status(&info.upid).unwrap_or(TaskState::Unknown { endtime: now }); |
125 | finish_list.push(TaskListInfo { | |
126 | upid: info.upid, | |
127 | upid_str: info.upid_str, | |
128 | state: Some(status) | |
129 | }); | |
130 | return None; | |
131 | } | |
132 | ||
133 | Some(info) | |
134 | }).collect(); | |
135 | ||
136 | if let Some(upid) = new_upid { | |
137 | active_list.push(TaskListInfo { upid: upid.clone(), upid_str: upid.to_string(), state: None }); | |
138 | } | |
139 | ||
140 | let active_raw = render_task_list(&active_list); | |
141 | ||
142 | let options = self.file_opts.clone() | |
143 | .perm(nix::sys::stat::Mode::from_bits_truncate(0o660)); | |
144 | ||
145 | replace_file( | |
146 | &self.active_tasks_fn, | |
147 | active_raw.as_bytes(), | |
148 | options, | |
e0a19d33 | 149 | false, |
0a33fba4 DM |
150 | )?; |
151 | ||
152 | finish_list.sort_unstable_by(|a, b| { | |
153 | match (&a.state, &b.state) { | |
154 | (Some(s1), Some(s2)) => s1.cmp(&s2), | |
155 | (Some(_), None) => std::cmp::Ordering::Less, | |
156 | (None, Some(_)) => std::cmp::Ordering::Greater, | |
157 | _ => a.upid.starttime.cmp(&b.upid.starttime), | |
158 | } | |
159 | }); | |
160 | ||
161 | if !finish_list.is_empty() { | |
162 | let options = self.file_opts.clone() | |
163 | .perm(nix::sys::stat::Mode::from_bits_truncate(0o660)); | |
164 | ||
165 | let mut writer = atomic_open_or_create_file( | |
166 | &self.task_archive_fn, | |
167 | OFlag::O_APPEND | OFlag::O_RDWR, | |
168 | &[], | |
169 | options, | |
e0a19d33 | 170 | false, |
0a33fba4 DM |
171 | )?; |
172 | for info in &finish_list { | |
173 | writer.write_all(render_task_line(&info).as_bytes())?; | |
174 | } | |
175 | } | |
176 | ||
177 | // TODO Remove with 1.x | |
178 | // for compatibility, if we had an INDEX file, we do not need it anymore | |
179 | if had_index_file { | |
180 | let _ = nix::unistd::unlink(&self.task_index_fn); | |
181 | } | |
182 | ||
183 | drop(lock); | |
184 | ||
185 | Ok(()) | |
186 | } | |
187 | ||
188 | // Create task log directory with correct permissions | |
189 | fn create_task_log_dirs(&self) -> Result<(), Error> { | |
190 | ||
191 | try_block!({ | |
192 | let dir_opts = self.file_opts.clone() | |
193 | .perm(nix::sys::stat::Mode::from_bits_truncate(0o755)); | |
194 | ||
195 | create_path(&self.taskdir, Some(dir_opts.clone()), Some(dir_opts.clone()))?; | |
196 | // fixme:??? create_path(pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR, None, Some(opts))?; | |
197 | Ok(()) | |
198 | }).map_err(|err: Error| format_err!("unable to create task log dir - {}", err)) | |
199 | } | |
200 | } | |
201 | ||
202 | /// Initialize the WorkerTask library | |
203 | pub fn init_worker_tasks(basedir: PathBuf, file_opts: CreateOptions) -> Result<(), Error> { | |
204 | let setup = WorkerTaskSetup::new(basedir, file_opts); | |
205 | setup.create_task_log_dirs()?; | |
206 | WORKER_TASK_SETUP.set(setup) | |
207 | .map_err(|_| format_err!("init_worker_tasks failed - already initialized")) | |
208 | } | |
209 | ||
210 | /// checks if the Task Archive is bigger that 'size_threshold' bytes, and | |
211 | /// rotates it if it is | |
d5790a9f DM |
212 | pub fn rotate_task_log_archive( |
213 | size_threshold: u64, | |
214 | compress: bool, | |
215 | max_files: Option<usize>, | |
216 | options: Option<CreateOptions>, | |
217 | ) -> Result<bool, Error> { | |
0a33fba4 DM |
218 | |
219 | let setup = worker_task_setup()?; | |
220 | ||
221 | let _lock = setup.lock_task_list_files(true)?; | |
222 | ||
d5790a9f DM |
223 | let mut logrotate = LogRotate::new( |
224 | &setup.task_archive_fn, | |
225 | compress, | |
226 | max_files, | |
227 | options, | |
228 | )?; | |
0a33fba4 | 229 | |
d5790a9f | 230 | logrotate.rotate(size_threshold) |
0a33fba4 DM |
231 | } |
232 | ||
de55fff2 DC |
233 | /// removes all task logs that are older than the oldest task entry in the |
234 | /// task archive | |
235 | pub fn cleanup_old_tasks(compressed: bool) -> Result<(), Error> { | |
236 | let setup = worker_task_setup()?; | |
237 | ||
238 | let _lock = setup.lock_task_list_files(true)?; | |
239 | ||
d5790a9f DM |
240 | let logrotate = LogRotate::new( |
241 | &setup.task_archive_fn, | |
242 | compressed, | |
243 | None, | |
244 | None, | |
245 | )?; | |
de55fff2 DC |
246 | |
247 | let mut timestamp = None; | |
248 | if let Some(last_file) = logrotate.files().last() { | |
249 | let reader = BufReader::new(last_file); | |
250 | for line in reader.lines() { | |
251 | let line = line?; | |
252 | if let Ok((_, _, Some(state))) = parse_worker_status_line(&line) { | |
253 | timestamp = Some(state.endtime()); | |
254 | break; | |
255 | } | |
256 | } | |
257 | } | |
258 | ||
259 | fn get_modified(entry: std::fs::DirEntry) -> Result<SystemTime, std::io::Error> { | |
260 | entry.metadata()?.modified() | |
261 | } | |
262 | ||
263 | if let Some(timestamp) = timestamp { | |
264 | let cutoff_time = if timestamp > 0 { | |
265 | SystemTime::UNIX_EPOCH.checked_add(Duration::from_secs(timestamp as u64)) | |
266 | } else { | |
267 | SystemTime::UNIX_EPOCH.checked_sub(Duration::from_secs(-timestamp as u64)) | |
268 | }.ok_or_else(|| format_err!("could not calculate cutoff time"))?; | |
269 | ||
270 | for i in 0..256 { | |
271 | let mut path = setup.taskdir.clone(); | |
272 | path.push(format!("{:02X}", i)); | |
273 | for file in std::fs::read_dir(path)? { | |
274 | let file = file?; | |
275 | let path = file.path(); | |
276 | ||
277 | let modified = get_modified(file) | |
278 | .map_err(|err| format_err!("error getting mtime for {:?}: {}", path, err))?; | |
279 | ||
280 | if modified < cutoff_time { | |
281 | match std::fs::remove_file(path) { | |
282 | Ok(()) => {}, | |
283 | Err(err) if err.kind() == std::io::ErrorKind::NotFound => {}, | |
284 | Err(err) => bail!("could not remove file: {}", err), | |
285 | } | |
286 | } | |
287 | } | |
288 | } | |
289 | } | |
290 | ||
291 | Ok(()) | |
292 | } | |
293 | ||
0a33fba4 DM |
294 | |
295 | /// Path to the worker log file | |
296 | pub fn upid_log_path(upid: &UPID) -> Result<std::path::PathBuf, Error> { | |
297 | let setup = worker_task_setup()?; | |
298 | Ok(setup.log_path(upid)) | |
299 | } | |
300 | ||
301 | /// Read endtime (time of last log line) and exitstatus from task log file | |
302 | /// If there is not a single line with at valid datetime, we assume the | |
303 | /// starttime to be the endtime | |
304 | pub fn upid_read_status(upid: &UPID) -> Result<TaskState, Error> { | |
305 | ||
306 | let setup = worker_task_setup()?; | |
307 | ||
308 | let mut status = TaskState::Unknown { endtime: upid.starttime }; | |
309 | ||
310 | let path = setup.log_path(upid); | |
311 | ||
312 | let mut file = File::open(path)?; | |
313 | ||
314 | /// speedup - only read tail | |
315 | use std::io::Seek; | |
316 | use std::io::SeekFrom; | |
317 | let _ = file.seek(SeekFrom::End(-8192)); // ignore errors | |
318 | ||
319 | let mut data = Vec::with_capacity(8192); | |
320 | file.read_to_end(&mut data)?; | |
321 | ||
322 | // strip newlines at the end of the task logs | |
323 | while data.last() == Some(&b'\n') { | |
324 | data.pop(); | |
325 | } | |
326 | ||
327 | let last_line = match data.iter().rposition(|c| *c == b'\n') { | |
328 | Some(start) if data.len() > (start+1) => &data[start+1..], | |
329 | Some(_) => &data, // should not happen, since we removed all trailing newlines | |
330 | None => &data, | |
331 | }; | |
332 | ||
333 | let last_line = std::str::from_utf8(last_line) | |
334 | .map_err(|err| format_err!("upid_read_status: utf8 parse failed: {}", err))?; | |
335 | ||
336 | let mut iter = last_line.splitn(2, ": "); | |
337 | if let Some(time_str) = iter.next() { | |
6ef1b649 | 338 | if let Ok(endtime) = proxmox_time::parse_rfc3339(time_str) { |
0a33fba4 DM |
339 | // set the endtime even if we cannot parse the state |
340 | status = TaskState::Unknown { endtime }; | |
341 | if let Some(rest) = iter.next().and_then(|rest| rest.strip_prefix("TASK ")) { | |
342 | if let Ok(state) = TaskState::from_endtime_and_message(endtime, rest) { | |
343 | status = state; | |
344 | } | |
345 | } | |
346 | } | |
347 | } | |
348 | ||
349 | Ok(status) | |
346a488e | 350 | } |
784fa1c2 | 351 | |
479f6e40 DM |
352 | lazy_static! { |
353 | static ref WORKER_TASK_LIST: Mutex<HashMap<usize, Arc<WorkerTask>>> = Mutex::new(HashMap::new()); | |
a68768cf | 354 | } |
d607b886 | 355 | |
a68768cf TL |
356 | /// checks if the task UPID refers to a worker from this process |
357 | fn is_local_worker(upid: &UPID) -> bool { | |
b9700a9f | 358 | upid.pid == crate::pid() && upid.pstart == crate::pstart() |
479f6e40 DM |
359 | } |
360 | ||
634132fe | 361 | /// Test if the task is still running |
5751e495 | 362 | pub async fn worker_is_active(upid: &UPID) -> Result<bool, Error> { |
a68768cf | 363 | if is_local_worker(upid) { |
5751e495 DM |
364 | return Ok(WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id)); |
365 | } | |
366 | ||
3984a5fd | 367 | if procfs::check_process_running_pstart(upid.pid, upid.pstart).is_none() { |
5751e495 DM |
368 | return Ok(false); |
369 | } | |
370 | ||
b9700a9f | 371 | let sock = crate::ctrl_sock_from_pid(upid.pid); |
5751e495 | 372 | let cmd = json!({ |
a68768cf | 373 | "command": "worker-task-status", |
385681c9 TL |
374 | "args": { |
375 | "upid": upid.to_string(), | |
376 | }, | |
5751e495 | 377 | }); |
b9700a9f | 378 | let status = crate::send_command(sock, &cmd).await?; |
4494d078 | 379 | |
5751e495 DM |
380 | if let Some(active) = status.as_bool() { |
381 | Ok(active) | |
382 | } else { | |
383 | bail!("got unexpected result {:?} (expected bool)", status); | |
384 | } | |
385 | } | |
386 | ||
387 | /// Test if the task is still running (fast but inaccurate implementation) | |
388 | /// | |
a68768cf | 389 | /// If the task is spawned from a different process, we simply return if |
5751e495 DM |
390 | /// that process is still running. This information is good enough to detect |
391 | /// stale tasks... | |
77ebbefc | 392 | pub fn worker_is_active_local(upid: &UPID) -> bool { |
a68768cf | 393 | if is_local_worker(upid) { |
62ee2eb4 | 394 | WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id) |
634132fe | 395 | } else { |
62ee2eb4 | 396 | procfs::check_process_running_pstart(upid.pid, upid.pstart).is_some() |
479f6e40 DM |
397 | } |
398 | } | |
399 | ||
49e25688 | 400 | /// Register task control command on a [CommandSocket]. |
2e44983a DM |
401 | /// |
402 | /// This create two commands: | |
403 | /// | |
404 | /// * ``worker-task-abort <UPID>``: calls [abort_local_worker] | |
405 | /// | |
406 | /// * ``worker-task-status <UPID>``: return true of false, depending on | |
407 | /// whether the worker is running or stopped. | |
a68768cf | 408 | pub fn register_task_control_commands( |
49e25688 | 409 | commando_sock: &mut CommandSocket, |
a68768cf TL |
410 | ) -> Result<(), Error> { |
411 | fn get_upid(args: Option<&Value>) -> Result<UPID, Error> { | |
412 | let args = if let Some(args) = args { args } else { bail!("missing args") }; | |
413 | let upid = match args.get("upid") { | |
414 | Some(Value::String(upid)) => upid.parse::<UPID>()?, | |
415 | None => bail!("no upid in args"), | |
416 | _ => bail!("unable to parse upid"), | |
417 | }; | |
418 | if !is_local_worker(&upid) { | |
d607b886 DM |
419 | bail!("upid does not belong to this process"); |
420 | } | |
a68768cf TL |
421 | Ok(upid) |
422 | } | |
d607b886 | 423 | |
a68768cf TL |
424 | commando_sock.register_command("worker-task-abort".into(), move |args| { |
425 | let upid = get_upid(args)?; | |
426 | ||
3f742f95 DC |
427 | abort_local_worker(upid); |
428 | ||
a68768cf | 429 | Ok(Value::Null) |
d607b886 | 430 | })?; |
a68768cf TL |
431 | commando_sock.register_command("worker-task-status".into(), move |args| { |
432 | let upid = get_upid(args)?; | |
d607b886 | 433 | |
a68768cf TL |
434 | let active = WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id); |
435 | ||
436 | Ok(active.into()) | |
437 | })?; | |
d607b886 DM |
438 | |
439 | Ok(()) | |
440 | } | |
441 | ||
2e44983a DM |
442 | /// Try to abort a worker task, but do no wait |
443 | /// | |
444 | /// Errors (if any) are simply logged. | |
445 | pub fn abort_worker_nowait(upid: UPID) { | |
75fef4b4 WB |
446 | tokio::spawn(async move { |
447 | if let Err(err) = abort_worker(upid).await { | |
2e44983a | 448 | log::error!("abort worker task failed - {}", err); |
321070b4 | 449 | } |
75fef4b4 | 450 | }); |
321070b4 DM |
451 | } |
452 | ||
2e44983a DM |
453 | /// Abort a worker task |
454 | /// | |
455 | /// By sending ``worker-task-abort`` to the control socket. | |
5751e495 | 456 | pub async fn abort_worker(upid: UPID) -> Result<(), Error> { |
321070b4 | 457 | |
b9700a9f | 458 | let sock = crate::ctrl_sock_from_pid(upid.pid); |
321070b4 | 459 | let cmd = json!({ |
a68768cf | 460 | "command": "worker-task-abort", |
385681c9 TL |
461 | "args": { |
462 | "upid": upid.to_string(), | |
463 | }, | |
321070b4 | 464 | }); |
b9700a9f | 465 | crate::send_command(sock, &cmd).map_ok(|_| ()).await |
321070b4 DM |
466 | } |
467 | ||
77bd2a46 | 468 | fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<TaskState>), Error> { |
4b01c983 DM |
469 | |
470 | let data = line.splitn(3, ' ').collect::<Vec<&str>>(); | |
471 | ||
472 | let len = data.len(); | |
473 | ||
474 | match len { | |
475 | 1 => Ok((data[0].to_owned(), data[0].parse::<UPID>()?, None)), | |
476 | 3 => { | |
477 | let endtime = i64::from_str_radix(data[1], 16)?; | |
77bd2a46 DC |
478 | let state = TaskState::from_endtime_and_message(endtime, data[2])?; |
479 | Ok((data[0].to_owned(), data[0].parse::<UPID>()?, Some(state))) | |
4b01c983 DM |
480 | } |
481 | _ => bail!("wrong number of components"), | |
482 | } | |
483 | } | |
484 | ||
4c116baf | 485 | /// Task State |
77bd2a46 | 486 | #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] |
4c116baf DC |
487 | pub enum TaskState { |
488 | /// The Task ended with an undefined state | |
77bd2a46 | 489 | Unknown { endtime: i64 }, |
4c116baf | 490 | /// The Task ended and there were no errors or warnings |
77bd2a46 | 491 | OK { endtime: i64 }, |
4c116baf | 492 | /// The Task had 'count' amount of warnings and no errors |
77bd2a46 | 493 | Warning { count: u64, endtime: i64 }, |
4c116baf | 494 | /// The Task ended with the error described in 'message' |
77bd2a46 | 495 | Error { message: String, endtime: i64 }, |
4c116baf DC |
496 | } |
497 | ||
498 | impl TaskState { | |
77bd2a46 DC |
499 | pub fn endtime(&self) -> i64 { |
500 | match *self { | |
501 | TaskState::Unknown { endtime } => endtime, | |
502 | TaskState::OK { endtime } => endtime, | |
503 | TaskState::Warning { endtime, .. } => endtime, | |
504 | TaskState::Error { endtime, .. } => endtime, | |
4c116baf DC |
505 | } |
506 | } | |
4c116baf | 507 | |
77bd2a46 | 508 | fn result_text(&self) -> String { |
4c116baf | 509 | match self { |
77bd2a46 DC |
510 | TaskState::Error { message, .. } => format!("TASK ERROR: {}", message), |
511 | other => format!("TASK {}", other), | |
4c116baf DC |
512 | } |
513 | } | |
4c116baf | 514 | |
77bd2a46 | 515 | fn from_endtime_and_message(endtime: i64, s: &str) -> Result<Self, Error> { |
4c116baf | 516 | if s == "unknown" { |
77bd2a46 | 517 | Ok(TaskState::Unknown { endtime }) |
4c116baf | 518 | } else if s == "OK" { |
77bd2a46 | 519 | Ok(TaskState::OK { endtime }) |
365915da FG |
520 | } else if let Some(warnings) = s.strip_prefix("WARNINGS: ") { |
521 | let count: u64 = warnings.parse()?; | |
77bd2a46 | 522 | Ok(TaskState::Warning{ count, endtime }) |
3984a5fd | 523 | } else if !s.is_empty() { |
365915da | 524 | let message = if let Some(err) = s.strip_prefix("ERROR: ") { err } else { s }.to_string(); |
77bd2a46 | 525 | Ok(TaskState::Error{ message, endtime }) |
4c116baf DC |
526 | } else { |
527 | bail!("unable to parse Task Status '{}'", s); | |
528 | } | |
529 | } | |
530 | } | |
531 | ||
77bd2a46 DC |
532 | impl std::cmp::PartialOrd for TaskState { |
533 | fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { | |
534 | Some(self.endtime().cmp(&other.endtime())) | |
535 | } | |
536 | } | |
537 | ||
538 | impl std::cmp::Ord for TaskState { | |
539 | fn cmp(&self, other: &Self) -> std::cmp::Ordering { | |
540 | self.endtime().cmp(&other.endtime()) | |
541 | } | |
542 | } | |
543 | ||
544 | impl std::fmt::Display for TaskState { | |
545 | fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | |
546 | match self { | |
547 | TaskState::Unknown { .. } => write!(f, "unknown"), | |
548 | TaskState::OK { .. }=> write!(f, "OK"), | |
549 | TaskState::Warning { count, .. } => write!(f, "WARNINGS: {}", count), | |
550 | TaskState::Error { message, .. } => write!(f, "{}", message), | |
551 | } | |
552 | } | |
553 | } | |
554 | ||
93aebb38 DM |
555 | /// Task details including parsed UPID |
556 | /// | |
557 | /// If there is no `state`, the task is still running. | |
558 | #[derive(Debug)] | |
559 | pub struct TaskListInfo { | |
560 | /// The parsed UPID | |
561 | pub upid: UPID, | |
562 | /// UPID string representation | |
563 | pub upid_str: String, | |
564 | /// Task `(endtime, status)` if already finished | |
77bd2a46 | 565 | pub state: Option<TaskState>, // endtime, status |
93aebb38 DM |
566 | } |
567 | ||
bbeb0256 DC |
568 | fn render_task_line(info: &TaskListInfo) -> String { |
569 | let mut raw = String::new(); | |
570 | if let Some(status) = &info.state { | |
571 | raw.push_str(&format!("{} {:08X} {}\n", info.upid_str, status.endtime(), status)); | |
572 | } else { | |
573 | raw.push_str(&info.upid_str); | |
574 | raw.push('\n'); | |
575 | } | |
576 | ||
577 | raw | |
578 | } | |
579 | ||
580 | fn render_task_list(list: &[TaskListInfo]) -> String { | |
581 | let mut raw = String::new(); | |
582 | for info in list { | |
583 | raw.push_str(&render_task_line(&info)); | |
584 | } | |
585 | raw | |
586 | } | |
587 | ||
784fa1c2 DC |
588 | // note this is not locked, caller has to make sure it is |
589 | // this will skip (and log) lines that are not valid status lines | |
590 | fn read_task_file<R: Read>(reader: R) -> Result<Vec<TaskListInfo>, Error> | |
591 | { | |
592 | let reader = BufReader::new(reader); | |
593 | let mut list = Vec::new(); | |
594 | for line in reader.lines() { | |
595 | let line = line?; | |
596 | match parse_worker_status_line(&line) { | |
597 | Ok((upid_str, upid, state)) => list.push(TaskListInfo { | |
598 | upid_str, | |
599 | upid, | |
600 | state | |
601 | }), | |
602 | Err(err) => { | |
2e44983a | 603 | log::warn!("unable to parse worker status '{}' - {}", line, err); |
784fa1c2 DC |
604 | continue; |
605 | } | |
606 | }; | |
607 | } | |
608 | ||
609 | Ok(list) | |
610 | } | |
611 | ||
612 | // note this is not locked, caller has to make sure it is | |
613 | fn read_task_file_from_path<P>(path: P) -> Result<Vec<TaskListInfo>, Error> | |
614 | where | |
615 | P: AsRef<std::path::Path> + std::fmt::Debug, | |
616 | { | |
617 | let file = match File::open(&path) { | |
618 | Ok(f) => f, | |
619 | Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()), | |
620 | Err(err) => bail!("unable to open task list {:?} - {}", path, err), | |
621 | }; | |
622 | ||
623 | read_task_file(file) | |
624 | } | |
625 | ||
2e44983a | 626 | /// Iterate over existing/active worker tasks |
e7244387 DC |
627 | pub struct TaskListInfoIterator { |
628 | list: VecDeque<TaskListInfo>, | |
264779e7 | 629 | end: bool, |
e7244387 | 630 | archive: Option<LogRotateFiles>, |
0a33fba4 | 631 | lock: Option<TaskListLockGuard>, |
e7244387 DC |
632 | } |
633 | ||
634 | impl TaskListInfoIterator { | |
2e44983a | 635 | /// Creates a new iterator instance. |
e7244387 | 636 | pub fn new(active_only: bool) -> Result<Self, Error> { |
0a33fba4 DM |
637 | |
638 | let setup = worker_task_setup()?; | |
639 | ||
e7244387 | 640 | let (read_lock, active_list) = { |
0a33fba4 DM |
641 | let lock = setup.lock_task_list_files(false)?; |
642 | let active_list = read_task_file_from_path(&setup.active_tasks_fn)?; | |
e7244387 DC |
643 | |
644 | let needs_update = active_list | |
645 | .iter() | |
df4827f2 | 646 | .any(|info| info.state.is_some() || !worker_is_active_local(&info.upid)); |
e7244387 | 647 | |
264779e7 | 648 | // TODO remove with 1.x |
0a33fba4 | 649 | let index_exists = setup.task_index_fn.is_file(); |
264779e7 DC |
650 | |
651 | if needs_update || index_exists { | |
e7244387 | 652 | drop(lock); |
0a33fba4 DM |
653 | setup.update_active_workers(None)?; |
654 | let lock = setup.lock_task_list_files(false)?; | |
655 | let active_list = read_task_file_from_path(&setup.active_tasks_fn)?; | |
e7244387 DC |
656 | (lock, active_list) |
657 | } else { | |
658 | (lock, active_list) | |
659 | } | |
660 | }; | |
661 | ||
662 | let archive = if active_only { | |
663 | None | |
664 | } else { | |
d5790a9f | 665 | let logrotate = LogRotate::new(&setup.task_archive_fn, true, None, None)?; |
e7244387 DC |
666 | Some(logrotate.files()) |
667 | }; | |
668 | ||
e7244387 DC |
669 | let lock = if active_only { None } else { Some(read_lock) }; |
670 | ||
671 | Ok(Self { | |
672 | list: active_list.into(), | |
264779e7 | 673 | end: active_only, |
e7244387 DC |
674 | archive, |
675 | lock, | |
676 | }) | |
677 | } | |
678 | } | |
679 | ||
680 | impl Iterator for TaskListInfoIterator { | |
681 | type Item = Result<TaskListInfo, Error>; | |
682 | ||
683 | fn next(&mut self) -> Option<Self::Item> { | |
684 | loop { | |
685 | if let Some(element) = self.list.pop_back() { | |
686 | return Some(Ok(element)); | |
264779e7 DC |
687 | } else if self.end { |
688 | return None; | |
e7244387 | 689 | } else { |
264779e7 DC |
690 | if let Some(mut archive) = self.archive.take() { |
691 | if let Some(file) = archive.next() { | |
692 | let list = match read_task_file(file) { | |
693 | Ok(list) => list, | |
e7244387 DC |
694 | Err(err) => return Some(Err(err)), |
695 | }; | |
264779e7 DC |
696 | self.list.append(&mut list.into()); |
697 | self.archive = Some(archive); | |
698 | continue; | |
e7244387 | 699 | } |
e7244387 | 700 | } |
264779e7 DC |
701 | |
702 | self.end = true; | |
703 | self.lock.take(); | |
e7244387 DC |
704 | } |
705 | } | |
706 | } | |
707 | } | |
708 | ||
882594c5 DM |
709 | /// Launch long running worker tasks. |
710 | /// | |
711 | /// A worker task can either be a whole thread, or a simply tokio | |
712 | /// task/future. Each task can `log()` messages, which are stored | |
713 | /// persistently to files. Task should poll the `abort_requested` | |
714 | /// flag, and stop execution when requested. | |
479f6e40 | 715 | pub struct WorkerTask { |
0a33fba4 | 716 | setup: &'static WorkerTaskSetup, |
479f6e40 DM |
717 | upid: UPID, |
718 | data: Mutex<WorkerTaskData>, | |
719 | abort_requested: AtomicBool, | |
720 | } | |
721 | ||
722 | impl std::fmt::Display for WorkerTask { | |
723 | ||
724 | fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { | |
725 | self.upid.fmt(f) | |
726 | } | |
727 | } | |
728 | ||
479f6e40 DM |
729 | struct WorkerTaskData { |
730 | logger: FileLogger, | |
731 | progress: f64, // 0..1 | |
f6de2c73 | 732 | warn_count: u64, |
75bc49be | 733 | pub abort_listeners: Vec<oneshot::Sender<()>>, |
479f6e40 DM |
734 | } |
735 | ||
479f6e40 DM |
736 | impl WorkerTask { |
737 | ||
0a33fba4 DM |
738 | pub fn new( |
739 | worker_type: &str, | |
740 | worker_id: Option<String>, | |
741 | auth_id: String, | |
742 | to_stdout: bool, | |
743 | ) -> Result<Arc<Self>, Error> { | |
744 | ||
745 | let setup = worker_task_setup()?; | |
746 | ||
e6dc35ac | 747 | let upid = UPID::new(worker_type, worker_id, auth_id)?; |
634132fe | 748 | let task_id = upid.task_id; |
479f6e40 | 749 | |
0a33fba4 | 750 | let mut path = setup.taskdir.clone(); |
35950380 | 751 | |
7f3d9100 | 752 | path.push(format!("{:02X}", upid.pstart & 255)); |
479f6e40 | 753 | |
0a33fba4 DM |
754 | let dir_opts = setup.file_opts.clone() |
755 | .perm(nix::sys::stat::Mode::from_bits_truncate(0o755)); | |
35950380 | 756 | |
0a33fba4 | 757 | create_path(&path, None, Some(dir_opts))?; |
479f6e40 DM |
758 | |
759 | path.push(upid.to_string()); | |
760 | ||
c0df91f8 | 761 | let logger_options = FileLogOptions { |
913dddea | 762 | to_stdout, |
c0df91f8 | 763 | exclusive: true, |
e5adbc34 | 764 | prefix_time: true, |
c0df91f8 | 765 | read: true, |
0a33fba4 | 766 | file_opts: setup.file_opts.clone(), |
c0df91f8 TL |
767 | ..Default::default() |
768 | }; | |
769 | let logger = FileLogger::new(&path, logger_options)?; | |
479f6e40 DM |
770 | |
771 | let worker = Arc::new(Self { | |
0a33fba4 | 772 | setup, |
05d755b2 | 773 | upid: upid.clone(), |
479f6e40 DM |
774 | abort_requested: AtomicBool::new(false), |
775 | data: Mutex::new(WorkerTaskData { | |
776 | logger, | |
777 | progress: 0.0, | |
f6de2c73 | 778 | warn_count: 0, |
75bc49be | 779 | abort_listeners: vec![], |
479f6e40 DM |
780 | }), |
781 | }); | |
782 | ||
05d755b2 DC |
783 | // scope to drop the lock again after inserting |
784 | { | |
785 | let mut hash = WORKER_TASK_LIST.lock().unwrap(); | |
786 | hash.insert(task_id, worker.clone()); | |
b9700a9f | 787 | crate::set_worker_count(hash.len()); |
05d755b2 | 788 | } |
7a630df7 | 789 | |
0a33fba4 | 790 | setup.update_active_workers(Some(&upid))?; |
479f6e40 DM |
791 | |
792 | Ok(worker) | |
793 | } | |
794 | ||
882594c5 | 795 | /// Spawn a new tokio task/future. |
660c6846 DM |
796 | pub fn spawn<F, T>( |
797 | worker_type: &str, | |
798 | worker_id: Option<String>, | |
049a22a3 | 799 | auth_id: String, |
660c6846 DM |
800 | to_stdout: bool, |
801 | f: F, | |
802 | ) -> Result<String, Error> | |
479f6e40 | 803 | where F: Send + 'static + FnOnce(Arc<WorkerTask>) -> T, |
75fef4b4 | 804 | T: Send + 'static + Future<Output = Result<(), Error>>, |
479f6e40 | 805 | { |
e6dc35ac | 806 | let worker = WorkerTask::new(worker_type, worker_id, auth_id, to_stdout)?; |
660c6846 | 807 | let upid_str = worker.upid.to_string(); |
75fef4b4 WB |
808 | let f = f(worker.clone()); |
809 | tokio::spawn(async move { | |
810 | let result = f.await; | |
dd8e744f | 811 | worker.log_result(&result); |
75fef4b4 | 812 | }); |
479f6e40 | 813 | |
660c6846 | 814 | Ok(upid_str) |
479f6e40 DM |
815 | } |
816 | ||
882594c5 | 817 | /// Create a new worker thread. |
660c6846 DM |
818 | pub fn new_thread<F>( |
819 | worker_type: &str, | |
820 | worker_id: Option<String>, | |
049a22a3 | 821 | auth_id: String, |
660c6846 DM |
822 | to_stdout: bool, |
823 | f: F, | |
824 | ) -> Result<String, Error> | |
d3f4c08f | 825 | where F: Send + UnwindSafe + 'static + FnOnce(Arc<WorkerTask>) -> Result<(), Error> |
479f6e40 | 826 | { |
e6dc35ac | 827 | let worker = WorkerTask::new(worker_type, worker_id, auth_id, to_stdout)?; |
660c6846 | 828 | let upid_str = worker.upid.to_string(); |
479f6e40 | 829 | |
217170e1 | 830 | let _child = std::thread::Builder::new().name(upid_str.clone()).spawn(move || { |
d3f4c08f DM |
831 | let worker1 = worker.clone(); |
832 | let result = match std::panic::catch_unwind(move || f(worker1)) { | |
833 | Ok(r) => r, | |
834 | Err(panic) => { | |
835 | match panic.downcast::<&str>() { | |
836 | Ok(panic_msg) => { | |
837 | Err(format_err!("worker panicked: {}", panic_msg)) | |
838 | } | |
839 | Err(_) => { | |
840 | Err(format_err!("worker panicked: unknown type.")) | |
841 | } | |
842 | } | |
843 | } | |
844 | }; | |
845 | ||
dd8e744f | 846 | worker.log_result(&result); |
479f6e40 DM |
847 | }); |
848 | ||
660c6846 | 849 | Ok(upid_str) |
479f6e40 DM |
850 | } |
851 | ||
4c116baf DC |
852 | /// create state from self and a result |
853 | pub fn create_state(&self, result: &Result<(), Error>) -> TaskState { | |
f6de2c73 | 854 | let warn_count = self.data.lock().unwrap().warn_count; |
cef03f41 | 855 | |
6ef1b649 | 856 | let endtime = proxmox_time::epoch_i64(); |
77bd2a46 | 857 | |
4b01c983 | 858 | if let Err(err) = result { |
77bd2a46 | 859 | TaskState::Error { message: err.to_string(), endtime } |
f6de2c73 | 860 | } else if warn_count > 0 { |
77bd2a46 | 861 | TaskState::Warning { count: warn_count, endtime } |
4b01c983 | 862 | } else { |
77bd2a46 | 863 | TaskState::OK { endtime } |
4b01c983 | 864 | } |
cef03f41 DC |
865 | } |
866 | ||
867 | /// Log task result, remove task from running list | |
868 | pub fn log_result(&self, result: &Result<(), Error>) { | |
4c116baf | 869 | let state = self.create_state(result); |
1ec0d70d | 870 | self.log_message(state.result_text()); |
418def7a DM |
871 | |
872 | WORKER_TASK_LIST.lock().unwrap().remove(&self.upid.task_id); | |
0a33fba4 | 873 | let _ = self.setup.update_active_workers(None); |
b9700a9f | 874 | crate::set_worker_count(WORKER_TASK_LIST.lock().unwrap().len()); |
4b01c983 DM |
875 | } |
876 | ||
882594c5 | 877 | /// Log a message. |
1ec0d70d | 878 | pub fn log_message<S: AsRef<str>>(&self, msg: S) { |
479f6e40 DM |
879 | let mut data = self.data.lock().unwrap(); |
880 | data.logger.log(msg); | |
881 | } | |
882 | ||
f6de2c73 | 883 | /// Log a message as warning. |
1ec0d70d | 884 | pub fn log_warning<S: AsRef<str>>(&self, msg: S) { |
f6de2c73 DC |
885 | let mut data = self.data.lock().unwrap(); |
886 | data.logger.log(format!("WARN: {}", msg.as_ref())); | |
887 | data.warn_count += 1; | |
888 | } | |
889 | ||
882594c5 | 890 | /// Set progress indicator |
479f6e40 DM |
891 | pub fn progress(&self, progress: f64) { |
892 | if progress >= 0.0 && progress <= 1.0 { | |
893 | let mut data = self.data.lock().unwrap(); | |
894 | data.progress = progress; | |
895 | } else { | |
896 | // fixme: log!("task '{}': ignoring strange value for progress '{}'", self.upid, progress); | |
897 | } | |
898 | } | |
899 | ||
882594c5 | 900 | /// Request abort |
d607b886 | 901 | pub fn request_abort(&self) { |
a6c16894 DM |
902 | let prev_abort = self.abort_requested.swap(true, Ordering::SeqCst); |
903 | if !prev_abort { // log abort one time | |
1ec0d70d | 904 | self.log_message(format!("received abort request ...")); |
a6c16894 | 905 | } |
75bc49be DM |
906 | // noitify listeners |
907 | let mut data = self.data.lock().unwrap(); | |
908 | loop { | |
909 | match data.abort_listeners.pop() { | |
910 | None => { break; }, | |
911 | Some(ch) => { | |
d1d74c43 | 912 | let _ = ch.send(()); // ignore errors here |
75bc49be DM |
913 | }, |
914 | } | |
915 | } | |
479f6e40 DM |
916 | } |
917 | ||
75bc49be DM |
918 | /// Get a future which resolves on task abort |
919 | pub fn abort_future(&self) -> oneshot::Receiver<()> { | |
920 | let (tx, rx) = oneshot::channel::<()>(); | |
921 | ||
922 | let mut data = self.data.lock().unwrap(); | |
923 | if self.abort_requested() { | |
924 | let _ = tx.send(()); | |
925 | } else { | |
926 | data.abort_listeners.push(tx); | |
927 | } | |
928 | rx | |
929 | } | |
4bd2a9e4 DC |
930 | |
931 | pub fn upid(&self) -> &UPID { | |
932 | &self.upid | |
933 | } | |
479f6e40 | 934 | } |
d1993187 | 935 | |
c8449217 | 936 | impl WorkerTaskContext for WorkerTask { |
619cd5cb DM |
937 | |
938 | fn abort_requested(&self) -> bool { | |
939 | self.abort_requested.load(Ordering::SeqCst) | |
d1993187 WB |
940 | } |
941 | ||
0fd55b08 DM |
942 | fn shutdown_requested(&self) -> bool { |
943 | crate::shutdown_requested() | |
944 | } | |
945 | ||
946 | fn fail_on_shutdown(&self) -> Result<(), Error> { | |
947 | crate::fail_on_shutdown() | |
948 | } | |
949 | ||
d1993187 WB |
950 | fn log(&self, level: log::Level, message: &std::fmt::Arguments) { |
951 | match level { | |
1ec0d70d DM |
952 | log::Level::Error => self.log_warning(&message.to_string()), |
953 | log::Level::Warn => self.log_warning(&message.to_string()), | |
954 | log::Level::Info => self.log_message(&message.to_string()), | |
955 | log::Level::Debug => self.log_message(&format!("DEBUG: {}", message)), | |
956 | log::Level::Trace => self.log_message(&format!("TRACE: {}", message)), | |
d1993187 WB |
957 | } |
958 | } | |
959 | } | |
72fbe9ff WB |
960 | |
961 | /// Wait for a locally spanned worker task | |
962 | /// | |
963 | /// Note: local workers should print logs to stdout, so there is no | |
964 | /// need to fetch/display logs. We just wait for the worker to finish. | |
965 | pub async fn wait_for_local_worker(upid_str: &str) -> Result<(), Error> { | |
966 | ||
967 | let upid: UPID = upid_str.parse()?; | |
968 | ||
969 | let sleep_duration = core::time::Duration::new(0, 100_000_000); | |
970 | ||
971 | loop { | |
972 | if worker_is_active_local(&upid) { | |
973 | tokio::time::sleep(sleep_duration).await; | |
974 | } else { | |
975 | break; | |
976 | } | |
977 | } | |
978 | Ok(()) | |
979 | } | |
3f742f95 DC |
980 | |
981 | /// Request abort of a local worker (if existing and running) | |
982 | pub fn abort_local_worker(upid: UPID) { | |
983 | if let Some(ref worker) = WORKER_TASK_LIST.lock().unwrap().get(&upid.task_id) { | |
984 | worker.request_abort(); | |
985 | } | |
986 | } |