]>
Commit | Line | Data |
---|---|---|
e7244387 | 1 | use std::collections::{HashMap, VecDeque}; |
4b01c983 | 2 | use std::fs::File; |
0a33fba4 | 3 | use std::path::PathBuf; |
5ade6c25 | 4 | use std::io::{Read, Write, BufRead, BufReader}; |
d3f4c08f | 5 | use std::panic::UnwindSafe; |
18c0df4c WB |
6 | use std::sync::atomic::{AtomicBool, Ordering}; |
7 | use std::sync::{Arc, Mutex}; | |
de55fff2 | 8 | use std::time::{SystemTime, Duration}; |
d3f4c08f | 9 | |
f7d4e4b5 | 10 | use anyhow::{bail, format_err, Error}; |
18c0df4c WB |
11 | use futures::*; |
12 | use lazy_static::lazy_static; | |
321070b4 | 13 | use serde_json::{json, Value}; |
4c116baf | 14 | use serde::{Serialize, Deserialize}; |
18c0df4c | 15 | use tokio::sync::oneshot; |
0a33fba4 DM |
16 | use nix::fcntl::OFlag; |
17 | use once_cell::sync::OnceCell; | |
479f6e40 | 18 | |
619495b2 | 19 | use proxmox::sys::linux::procfs; |
0a33fba4 | 20 | use proxmox::tools::fs::{create_path, replace_file, atomic_open_or_create_file, CreateOptions}; |
6ef1b649 WB |
21 | use proxmox_lang::try_block; |
22 | use proxmox_schema::upid::UPID; | |
e18a6c9e | 23 | |
c8449217 | 24 | use pbs_tools::task::WorkerTaskContext; |
6c76aa43 | 25 | use pbs_tools::logrotate::{LogRotate, LogRotateFiles}; |
b9700a9f | 26 | |
49e25688 | 27 | use crate::{CommandSocket, FileLogger, FileLogOptions}; |
6c76aa43 | 28 | |
0a33fba4 | 29 | struct TaskListLockGuard(File); |
af06decd | 30 | |
0a33fba4 DM |
31 | struct WorkerTaskSetup { |
32 | file_opts: CreateOptions, | |
33 | taskdir: PathBuf, | |
34 | task_lock_fn: PathBuf, | |
35 | active_tasks_fn: PathBuf, | |
36 | task_index_fn: PathBuf, | |
37 | task_archive_fn: PathBuf, | |
38 | } | |
39 | ||
40 | static WORKER_TASK_SETUP: OnceCell<WorkerTaskSetup> = OnceCell::new(); | |
41 | ||
42 | fn worker_task_setup() -> Result<&'static WorkerTaskSetup, Error> { | |
43 | WORKER_TASK_SETUP.get() | |
44 | .ok_or_else(|| format_err!("WorkerTask library is not initialized")) | |
45 | } | |
46 | ||
47 | impl WorkerTaskSetup { | |
48 | ||
49 | fn new(basedir: PathBuf, file_opts: CreateOptions) -> Self { | |
50 | ||
51 | let mut taskdir = basedir.clone(); | |
52 | taskdir.push("tasks"); | |
53 | ||
54 | let mut task_lock_fn = taskdir.clone(); | |
55 | task_lock_fn.push(".active.lock"); | |
56 | ||
57 | let mut active_tasks_fn = taskdir.clone(); | |
58 | active_tasks_fn.push("active"); | |
59 | ||
60 | let mut task_index_fn = taskdir.clone(); | |
61 | task_index_fn.push("index"); | |
62 | ||
63 | let mut task_archive_fn = taskdir.clone(); | |
64 | task_archive_fn.push("archive"); | |
65 | ||
66 | Self { | |
67 | file_opts, | |
68 | taskdir, | |
69 | task_lock_fn, | |
70 | active_tasks_fn, | |
71 | task_index_fn, | |
72 | task_archive_fn, | |
73 | } | |
74 | } | |
75 | ||
76 | fn lock_task_list_files(&self, exclusive: bool) -> Result<TaskListLockGuard, Error> { | |
77 | let options = self.file_opts.clone() | |
78 | .perm(nix::sys::stat::Mode::from_bits_truncate(0o660)); | |
79 | ||
80 | let timeout = std::time::Duration::new(10, 0); | |
81 | ||
82 | let file = proxmox::tools::fs::open_file_locked( | |
83 | &self.task_lock_fn, | |
84 | timeout, | |
85 | exclusive, | |
86 | options, | |
87 | )?; | |
88 | ||
89 | Ok(TaskListLockGuard(file)) | |
90 | } | |
91 | ||
92 | fn log_path(&self, upid: &UPID) -> std::path::PathBuf { | |
93 | let mut path = self.taskdir.clone(); | |
94 | path.push(format!("{:02X}", upid.pstart % 256)); | |
95 | path.push(upid.to_string()); | |
96 | path | |
97 | } | |
98 | ||
99 | // atomically read/update the task list, update status of finished tasks | |
100 | // new_upid is added to the list when specified. | |
101 | fn update_active_workers(&self, new_upid: Option<&UPID>) -> Result<(), Error> { | |
102 | ||
103 | let lock = self.lock_task_list_files(true)?; | |
104 | ||
105 | // TODO remove with 1.x | |
106 | let mut finish_list: Vec<TaskListInfo> = read_task_file_from_path(&self.task_index_fn)?; | |
107 | let had_index_file = !finish_list.is_empty(); | |
108 | ||
109 | // We use filter_map because one negative case wants to *move* the data into `finish_list`, | |
110 | // clippy doesn't quite catch this! | |
111 | #[allow(clippy::unnecessary_filter_map)] | |
112 | let mut active_list: Vec<TaskListInfo> = read_task_file_from_path(&self.active_tasks_fn)? | |
113 | .into_iter() | |
114 | .filter_map(|info| { | |
115 | if info.state.is_some() { | |
116 | // this can happen when the active file still includes finished tasks | |
117 | finish_list.push(info); | |
118 | return None; | |
119 | } | |
120 | ||
121 | if !worker_is_active_local(&info.upid) { | |
122 | // println!("Detected stopped task '{}'", &info.upid_str); | |
6ef1b649 | 123 | let now = proxmox_time::epoch_i64(); |
0a33fba4 DM |
124 | let status = upid_read_status(&info.upid).unwrap_or(TaskState::Unknown { endtime: now }); |
125 | finish_list.push(TaskListInfo { | |
126 | upid: info.upid, | |
127 | upid_str: info.upid_str, | |
128 | state: Some(status) | |
129 | }); | |
130 | return None; | |
131 | } | |
132 | ||
133 | Some(info) | |
134 | }).collect(); | |
135 | ||
136 | if let Some(upid) = new_upid { | |
137 | active_list.push(TaskListInfo { upid: upid.clone(), upid_str: upid.to_string(), state: None }); | |
138 | } | |
139 | ||
140 | let active_raw = render_task_list(&active_list); | |
141 | ||
142 | let options = self.file_opts.clone() | |
143 | .perm(nix::sys::stat::Mode::from_bits_truncate(0o660)); | |
144 | ||
145 | replace_file( | |
146 | &self.active_tasks_fn, | |
147 | active_raw.as_bytes(), | |
148 | options, | |
149 | )?; | |
150 | ||
151 | finish_list.sort_unstable_by(|a, b| { | |
152 | match (&a.state, &b.state) { | |
153 | (Some(s1), Some(s2)) => s1.cmp(&s2), | |
154 | (Some(_), None) => std::cmp::Ordering::Less, | |
155 | (None, Some(_)) => std::cmp::Ordering::Greater, | |
156 | _ => a.upid.starttime.cmp(&b.upid.starttime), | |
157 | } | |
158 | }); | |
159 | ||
160 | if !finish_list.is_empty() { | |
161 | let options = self.file_opts.clone() | |
162 | .perm(nix::sys::stat::Mode::from_bits_truncate(0o660)); | |
163 | ||
164 | let mut writer = atomic_open_or_create_file( | |
165 | &self.task_archive_fn, | |
166 | OFlag::O_APPEND | OFlag::O_RDWR, | |
167 | &[], | |
168 | options, | |
169 | )?; | |
170 | for info in &finish_list { | |
171 | writer.write_all(render_task_line(&info).as_bytes())?; | |
172 | } | |
173 | } | |
174 | ||
175 | // TODO Remove with 1.x | |
176 | // for compatibility, if we had an INDEX file, we do not need it anymore | |
177 | if had_index_file { | |
178 | let _ = nix::unistd::unlink(&self.task_index_fn); | |
179 | } | |
180 | ||
181 | drop(lock); | |
182 | ||
183 | Ok(()) | |
184 | } | |
185 | ||
186 | // Create task log directory with correct permissions | |
187 | fn create_task_log_dirs(&self) -> Result<(), Error> { | |
188 | ||
189 | try_block!({ | |
190 | let dir_opts = self.file_opts.clone() | |
191 | .perm(nix::sys::stat::Mode::from_bits_truncate(0o755)); | |
192 | ||
193 | create_path(&self.taskdir, Some(dir_opts.clone()), Some(dir_opts.clone()))?; | |
194 | // fixme:??? create_path(pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR, None, Some(opts))?; | |
195 | Ok(()) | |
196 | }).map_err(|err: Error| format_err!("unable to create task log dir - {}", err)) | |
197 | } | |
198 | } | |
199 | ||
200 | /// Initialize the WorkerTask library | |
201 | pub fn init_worker_tasks(basedir: PathBuf, file_opts: CreateOptions) -> Result<(), Error> { | |
202 | let setup = WorkerTaskSetup::new(basedir, file_opts); | |
203 | setup.create_task_log_dirs()?; | |
204 | WORKER_TASK_SETUP.set(setup) | |
205 | .map_err(|_| format_err!("init_worker_tasks failed - already initialized")) | |
206 | } | |
207 | ||
208 | /// checks if the Task Archive is bigger that 'size_threshold' bytes, and | |
209 | /// rotates it if it is | |
210 | pub fn rotate_task_log_archive(size_threshold: u64, compress: bool, max_files: Option<usize>) -> Result<bool, Error> { | |
211 | ||
212 | let setup = worker_task_setup()?; | |
213 | ||
214 | let _lock = setup.lock_task_list_files(true)?; | |
215 | ||
216 | let mut logrotate = LogRotate::new(&setup.task_archive_fn, compress) | |
217 | .ok_or_else(|| format_err!("could not get archive file names"))?; | |
218 | ||
219 | logrotate.rotate(size_threshold, None, max_files) | |
220 | } | |
221 | ||
de55fff2 DC |
222 | /// removes all task logs that are older than the oldest task entry in the |
223 | /// task archive | |
224 | pub fn cleanup_old_tasks(compressed: bool) -> Result<(), Error> { | |
225 | let setup = worker_task_setup()?; | |
226 | ||
227 | let _lock = setup.lock_task_list_files(true)?; | |
228 | ||
229 | let logrotate = LogRotate::new(&setup.task_archive_fn, compressed) | |
230 | .ok_or_else(|| format_err!("could not get archive file names"))?; | |
231 | ||
232 | let mut timestamp = None; | |
233 | if let Some(last_file) = logrotate.files().last() { | |
234 | let reader = BufReader::new(last_file); | |
235 | for line in reader.lines() { | |
236 | let line = line?; | |
237 | if let Ok((_, _, Some(state))) = parse_worker_status_line(&line) { | |
238 | timestamp = Some(state.endtime()); | |
239 | break; | |
240 | } | |
241 | } | |
242 | } | |
243 | ||
244 | fn get_modified(entry: std::fs::DirEntry) -> Result<SystemTime, std::io::Error> { | |
245 | entry.metadata()?.modified() | |
246 | } | |
247 | ||
248 | if let Some(timestamp) = timestamp { | |
249 | let cutoff_time = if timestamp > 0 { | |
250 | SystemTime::UNIX_EPOCH.checked_add(Duration::from_secs(timestamp as u64)) | |
251 | } else { | |
252 | SystemTime::UNIX_EPOCH.checked_sub(Duration::from_secs(-timestamp as u64)) | |
253 | }.ok_or_else(|| format_err!("could not calculate cutoff time"))?; | |
254 | ||
255 | for i in 0..256 { | |
256 | let mut path = setup.taskdir.clone(); | |
257 | path.push(format!("{:02X}", i)); | |
258 | for file in std::fs::read_dir(path)? { | |
259 | let file = file?; | |
260 | let path = file.path(); | |
261 | ||
262 | let modified = get_modified(file) | |
263 | .map_err(|err| format_err!("error getting mtime for {:?}: {}", path, err))?; | |
264 | ||
265 | if modified < cutoff_time { | |
266 | match std::fs::remove_file(path) { | |
267 | Ok(()) => {}, | |
268 | Err(err) if err.kind() == std::io::ErrorKind::NotFound => {}, | |
269 | Err(err) => bail!("could not remove file: {}", err), | |
270 | } | |
271 | } | |
272 | } | |
273 | } | |
274 | } | |
275 | ||
276 | Ok(()) | |
277 | } | |
278 | ||
0a33fba4 DM |
279 | |
280 | /// Path to the worker log file | |
281 | pub fn upid_log_path(upid: &UPID) -> Result<std::path::PathBuf, Error> { | |
282 | let setup = worker_task_setup()?; | |
283 | Ok(setup.log_path(upid)) | |
284 | } | |
285 | ||
286 | /// Read endtime (time of last log line) and exitstatus from task log file | |
287 | /// If there is not a single line with at valid datetime, we assume the | |
288 | /// starttime to be the endtime | |
289 | pub fn upid_read_status(upid: &UPID) -> Result<TaskState, Error> { | |
290 | ||
291 | let setup = worker_task_setup()?; | |
292 | ||
293 | let mut status = TaskState::Unknown { endtime: upid.starttime }; | |
294 | ||
295 | let path = setup.log_path(upid); | |
296 | ||
297 | let mut file = File::open(path)?; | |
298 | ||
299 | /// speedup - only read tail | |
300 | use std::io::Seek; | |
301 | use std::io::SeekFrom; | |
302 | let _ = file.seek(SeekFrom::End(-8192)); // ignore errors | |
303 | ||
304 | let mut data = Vec::with_capacity(8192); | |
305 | file.read_to_end(&mut data)?; | |
306 | ||
307 | // strip newlines at the end of the task logs | |
308 | while data.last() == Some(&b'\n') { | |
309 | data.pop(); | |
310 | } | |
311 | ||
312 | let last_line = match data.iter().rposition(|c| *c == b'\n') { | |
313 | Some(start) if data.len() > (start+1) => &data[start+1..], | |
314 | Some(_) => &data, // should not happen, since we removed all trailing newlines | |
315 | None => &data, | |
316 | }; | |
317 | ||
318 | let last_line = std::str::from_utf8(last_line) | |
319 | .map_err(|err| format_err!("upid_read_status: utf8 parse failed: {}", err))?; | |
320 | ||
321 | let mut iter = last_line.splitn(2, ": "); | |
322 | if let Some(time_str) = iter.next() { | |
6ef1b649 | 323 | if let Ok(endtime) = proxmox_time::parse_rfc3339(time_str) { |
0a33fba4 DM |
324 | // set the endtime even if we cannot parse the state |
325 | status = TaskState::Unknown { endtime }; | |
326 | if let Some(rest) = iter.next().and_then(|rest| rest.strip_prefix("TASK ")) { | |
327 | if let Ok(state) = TaskState::from_endtime_and_message(endtime, rest) { | |
328 | status = state; | |
329 | } | |
330 | } | |
331 | } | |
332 | } | |
333 | ||
334 | Ok(status) | |
346a488e | 335 | } |
784fa1c2 | 336 | |
479f6e40 DM |
337 | lazy_static! { |
338 | static ref WORKER_TASK_LIST: Mutex<HashMap<usize, Arc<WorkerTask>>> = Mutex::new(HashMap::new()); | |
a68768cf | 339 | } |
d607b886 | 340 | |
a68768cf TL |
341 | /// checks if the task UPID refers to a worker from this process |
342 | fn is_local_worker(upid: &UPID) -> bool { | |
b9700a9f | 343 | upid.pid == crate::pid() && upid.pstart == crate::pstart() |
479f6e40 DM |
344 | } |
345 | ||
634132fe | 346 | /// Test if the task is still running |
5751e495 | 347 | pub async fn worker_is_active(upid: &UPID) -> Result<bool, Error> { |
a68768cf | 348 | if is_local_worker(upid) { |
5751e495 DM |
349 | return Ok(WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id)); |
350 | } | |
351 | ||
3984a5fd | 352 | if procfs::check_process_running_pstart(upid.pid, upid.pstart).is_none() { |
5751e495 DM |
353 | return Ok(false); |
354 | } | |
355 | ||
b9700a9f | 356 | let sock = crate::ctrl_sock_from_pid(upid.pid); |
5751e495 | 357 | let cmd = json!({ |
a68768cf | 358 | "command": "worker-task-status", |
385681c9 TL |
359 | "args": { |
360 | "upid": upid.to_string(), | |
361 | }, | |
5751e495 | 362 | }); |
b9700a9f | 363 | let status = crate::send_command(sock, &cmd).await?; |
4494d078 | 364 | |
5751e495 DM |
365 | if let Some(active) = status.as_bool() { |
366 | Ok(active) | |
367 | } else { | |
368 | bail!("got unexpected result {:?} (expected bool)", status); | |
369 | } | |
370 | } | |
371 | ||
372 | /// Test if the task is still running (fast but inaccurate implementation) | |
373 | /// | |
a68768cf | 374 | /// If the task is spawned from a different process, we simply return if |
5751e495 DM |
375 | /// that process is still running. This information is good enough to detect |
376 | /// stale tasks... | |
77ebbefc | 377 | pub fn worker_is_active_local(upid: &UPID) -> bool { |
a68768cf | 378 | if is_local_worker(upid) { |
62ee2eb4 | 379 | WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id) |
634132fe | 380 | } else { |
62ee2eb4 | 381 | procfs::check_process_running_pstart(upid.pid, upid.pstart).is_some() |
479f6e40 DM |
382 | } |
383 | } | |
384 | ||
49e25688 | 385 | /// Register task control command on a [CommandSocket]. |
2e44983a DM |
386 | /// |
387 | /// This create two commands: | |
388 | /// | |
389 | /// * ``worker-task-abort <UPID>``: calls [abort_local_worker] | |
390 | /// | |
391 | /// * ``worker-task-status <UPID>``: return true of false, depending on | |
392 | /// whether the worker is running or stopped. | |
a68768cf | 393 | pub fn register_task_control_commands( |
49e25688 | 394 | commando_sock: &mut CommandSocket, |
a68768cf TL |
395 | ) -> Result<(), Error> { |
396 | fn get_upid(args: Option<&Value>) -> Result<UPID, Error> { | |
397 | let args = if let Some(args) = args { args } else { bail!("missing args") }; | |
398 | let upid = match args.get("upid") { | |
399 | Some(Value::String(upid)) => upid.parse::<UPID>()?, | |
400 | None => bail!("no upid in args"), | |
401 | _ => bail!("unable to parse upid"), | |
402 | }; | |
403 | if !is_local_worker(&upid) { | |
d607b886 DM |
404 | bail!("upid does not belong to this process"); |
405 | } | |
a68768cf TL |
406 | Ok(upid) |
407 | } | |
d607b886 | 408 | |
a68768cf TL |
409 | commando_sock.register_command("worker-task-abort".into(), move |args| { |
410 | let upid = get_upid(args)?; | |
411 | ||
3f742f95 DC |
412 | abort_local_worker(upid); |
413 | ||
a68768cf | 414 | Ok(Value::Null) |
d607b886 | 415 | })?; |
a68768cf TL |
416 | commando_sock.register_command("worker-task-status".into(), move |args| { |
417 | let upid = get_upid(args)?; | |
d607b886 | 418 | |
a68768cf TL |
419 | let active = WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id); |
420 | ||
421 | Ok(active.into()) | |
422 | })?; | |
d607b886 DM |
423 | |
424 | Ok(()) | |
425 | } | |
426 | ||
2e44983a DM |
427 | /// Try to abort a worker task, but do no wait |
428 | /// | |
429 | /// Errors (if any) are simply logged. | |
430 | pub fn abort_worker_nowait(upid: UPID) { | |
75fef4b4 WB |
431 | tokio::spawn(async move { |
432 | if let Err(err) = abort_worker(upid).await { | |
2e44983a | 433 | log::error!("abort worker task failed - {}", err); |
321070b4 | 434 | } |
75fef4b4 | 435 | }); |
321070b4 DM |
436 | } |
437 | ||
2e44983a DM |
438 | /// Abort a worker task |
439 | /// | |
440 | /// By sending ``worker-task-abort`` to the control socket. | |
5751e495 | 441 | pub async fn abort_worker(upid: UPID) -> Result<(), Error> { |
321070b4 | 442 | |
b9700a9f | 443 | let sock = crate::ctrl_sock_from_pid(upid.pid); |
321070b4 | 444 | let cmd = json!({ |
a68768cf | 445 | "command": "worker-task-abort", |
385681c9 TL |
446 | "args": { |
447 | "upid": upid.to_string(), | |
448 | }, | |
321070b4 | 449 | }); |
b9700a9f | 450 | crate::send_command(sock, &cmd).map_ok(|_| ()).await |
321070b4 DM |
451 | } |
452 | ||
77bd2a46 | 453 | fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<TaskState>), Error> { |
4b01c983 DM |
454 | |
455 | let data = line.splitn(3, ' ').collect::<Vec<&str>>(); | |
456 | ||
457 | let len = data.len(); | |
458 | ||
459 | match len { | |
460 | 1 => Ok((data[0].to_owned(), data[0].parse::<UPID>()?, None)), | |
461 | 3 => { | |
462 | let endtime = i64::from_str_radix(data[1], 16)?; | |
77bd2a46 DC |
463 | let state = TaskState::from_endtime_and_message(endtime, data[2])?; |
464 | Ok((data[0].to_owned(), data[0].parse::<UPID>()?, Some(state))) | |
4b01c983 DM |
465 | } |
466 | _ => bail!("wrong number of components"), | |
467 | } | |
468 | } | |
469 | ||
4c116baf | 470 | /// Task State |
77bd2a46 | 471 | #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] |
4c116baf DC |
472 | pub enum TaskState { |
473 | /// The Task ended with an undefined state | |
77bd2a46 | 474 | Unknown { endtime: i64 }, |
4c116baf | 475 | /// The Task ended and there were no errors or warnings |
77bd2a46 | 476 | OK { endtime: i64 }, |
4c116baf | 477 | /// The Task had 'count' amount of warnings and no errors |
77bd2a46 | 478 | Warning { count: u64, endtime: i64 }, |
4c116baf | 479 | /// The Task ended with the error described in 'message' |
77bd2a46 | 480 | Error { message: String, endtime: i64 }, |
4c116baf DC |
481 | } |
482 | ||
483 | impl TaskState { | |
77bd2a46 DC |
484 | pub fn endtime(&self) -> i64 { |
485 | match *self { | |
486 | TaskState::Unknown { endtime } => endtime, | |
487 | TaskState::OK { endtime } => endtime, | |
488 | TaskState::Warning { endtime, .. } => endtime, | |
489 | TaskState::Error { endtime, .. } => endtime, | |
4c116baf DC |
490 | } |
491 | } | |
4c116baf | 492 | |
77bd2a46 | 493 | fn result_text(&self) -> String { |
4c116baf | 494 | match self { |
77bd2a46 DC |
495 | TaskState::Error { message, .. } => format!("TASK ERROR: {}", message), |
496 | other => format!("TASK {}", other), | |
4c116baf DC |
497 | } |
498 | } | |
4c116baf | 499 | |
77bd2a46 | 500 | fn from_endtime_and_message(endtime: i64, s: &str) -> Result<Self, Error> { |
4c116baf | 501 | if s == "unknown" { |
77bd2a46 | 502 | Ok(TaskState::Unknown { endtime }) |
4c116baf | 503 | } else if s == "OK" { |
77bd2a46 | 504 | Ok(TaskState::OK { endtime }) |
365915da FG |
505 | } else if let Some(warnings) = s.strip_prefix("WARNINGS: ") { |
506 | let count: u64 = warnings.parse()?; | |
77bd2a46 | 507 | Ok(TaskState::Warning{ count, endtime }) |
3984a5fd | 508 | } else if !s.is_empty() { |
365915da | 509 | let message = if let Some(err) = s.strip_prefix("ERROR: ") { err } else { s }.to_string(); |
77bd2a46 | 510 | Ok(TaskState::Error{ message, endtime }) |
4c116baf DC |
511 | } else { |
512 | bail!("unable to parse Task Status '{}'", s); | |
513 | } | |
514 | } | |
515 | } | |
516 | ||
77bd2a46 DC |
517 | impl std::cmp::PartialOrd for TaskState { |
518 | fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { | |
519 | Some(self.endtime().cmp(&other.endtime())) | |
520 | } | |
521 | } | |
522 | ||
523 | impl std::cmp::Ord for TaskState { | |
524 | fn cmp(&self, other: &Self) -> std::cmp::Ordering { | |
525 | self.endtime().cmp(&other.endtime()) | |
526 | } | |
527 | } | |
528 | ||
529 | impl std::fmt::Display for TaskState { | |
530 | fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | |
531 | match self { | |
532 | TaskState::Unknown { .. } => write!(f, "unknown"), | |
533 | TaskState::OK { .. }=> write!(f, "OK"), | |
534 | TaskState::Warning { count, .. } => write!(f, "WARNINGS: {}", count), | |
535 | TaskState::Error { message, .. } => write!(f, "{}", message), | |
536 | } | |
537 | } | |
538 | } | |
539 | ||
93aebb38 DM |
540 | /// Task details including parsed UPID |
541 | /// | |
542 | /// If there is no `state`, the task is still running. | |
543 | #[derive(Debug)] | |
544 | pub struct TaskListInfo { | |
545 | /// The parsed UPID | |
546 | pub upid: UPID, | |
547 | /// UPID string representation | |
548 | pub upid_str: String, | |
549 | /// Task `(endtime, status)` if already finished | |
77bd2a46 | 550 | pub state: Option<TaskState>, // endtime, status |
93aebb38 DM |
551 | } |
552 | ||
bbeb0256 DC |
553 | fn render_task_line(info: &TaskListInfo) -> String { |
554 | let mut raw = String::new(); | |
555 | if let Some(status) = &info.state { | |
556 | raw.push_str(&format!("{} {:08X} {}\n", info.upid_str, status.endtime(), status)); | |
557 | } else { | |
558 | raw.push_str(&info.upid_str); | |
559 | raw.push('\n'); | |
560 | } | |
561 | ||
562 | raw | |
563 | } | |
564 | ||
565 | fn render_task_list(list: &[TaskListInfo]) -> String { | |
566 | let mut raw = String::new(); | |
567 | for info in list { | |
568 | raw.push_str(&render_task_line(&info)); | |
569 | } | |
570 | raw | |
571 | } | |
572 | ||
784fa1c2 DC |
573 | // note this is not locked, caller has to make sure it is |
574 | // this will skip (and log) lines that are not valid status lines | |
575 | fn read_task_file<R: Read>(reader: R) -> Result<Vec<TaskListInfo>, Error> | |
576 | { | |
577 | let reader = BufReader::new(reader); | |
578 | let mut list = Vec::new(); | |
579 | for line in reader.lines() { | |
580 | let line = line?; | |
581 | match parse_worker_status_line(&line) { | |
582 | Ok((upid_str, upid, state)) => list.push(TaskListInfo { | |
583 | upid_str, | |
584 | upid, | |
585 | state | |
586 | }), | |
587 | Err(err) => { | |
2e44983a | 588 | log::warn!("unable to parse worker status '{}' - {}", line, err); |
784fa1c2 DC |
589 | continue; |
590 | } | |
591 | }; | |
592 | } | |
593 | ||
594 | Ok(list) | |
595 | } | |
596 | ||
597 | // note this is not locked, caller has to make sure it is | |
598 | fn read_task_file_from_path<P>(path: P) -> Result<Vec<TaskListInfo>, Error> | |
599 | where | |
600 | P: AsRef<std::path::Path> + std::fmt::Debug, | |
601 | { | |
602 | let file = match File::open(&path) { | |
603 | Ok(f) => f, | |
604 | Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()), | |
605 | Err(err) => bail!("unable to open task list {:?} - {}", path, err), | |
606 | }; | |
607 | ||
608 | read_task_file(file) | |
609 | } | |
610 | ||
2e44983a | 611 | /// Iterate over existing/active worker tasks |
e7244387 DC |
612 | pub struct TaskListInfoIterator { |
613 | list: VecDeque<TaskListInfo>, | |
264779e7 | 614 | end: bool, |
e7244387 | 615 | archive: Option<LogRotateFiles>, |
0a33fba4 | 616 | lock: Option<TaskListLockGuard>, |
e7244387 DC |
617 | } |
618 | ||
619 | impl TaskListInfoIterator { | |
2e44983a | 620 | /// Creates a new iterator instance. |
e7244387 | 621 | pub fn new(active_only: bool) -> Result<Self, Error> { |
0a33fba4 DM |
622 | |
623 | let setup = worker_task_setup()?; | |
624 | ||
e7244387 | 625 | let (read_lock, active_list) = { |
0a33fba4 DM |
626 | let lock = setup.lock_task_list_files(false)?; |
627 | let active_list = read_task_file_from_path(&setup.active_tasks_fn)?; | |
e7244387 DC |
628 | |
629 | let needs_update = active_list | |
630 | .iter() | |
df4827f2 | 631 | .any(|info| info.state.is_some() || !worker_is_active_local(&info.upid)); |
e7244387 | 632 | |
264779e7 | 633 | // TODO remove with 1.x |
0a33fba4 | 634 | let index_exists = setup.task_index_fn.is_file(); |
264779e7 DC |
635 | |
636 | if needs_update || index_exists { | |
e7244387 | 637 | drop(lock); |
0a33fba4 DM |
638 | setup.update_active_workers(None)?; |
639 | let lock = setup.lock_task_list_files(false)?; | |
640 | let active_list = read_task_file_from_path(&setup.active_tasks_fn)?; | |
e7244387 DC |
641 | (lock, active_list) |
642 | } else { | |
643 | (lock, active_list) | |
644 | } | |
645 | }; | |
646 | ||
647 | let archive = if active_only { | |
648 | None | |
649 | } else { | |
0a33fba4 | 650 | let logrotate = LogRotate::new(&setup.task_archive_fn, true) |
e4f5f59e | 651 | .ok_or_else(|| format_err!("could not get archive file names"))?; |
e7244387 DC |
652 | Some(logrotate.files()) |
653 | }; | |
654 | ||
e7244387 DC |
655 | let lock = if active_only { None } else { Some(read_lock) }; |
656 | ||
657 | Ok(Self { | |
658 | list: active_list.into(), | |
264779e7 | 659 | end: active_only, |
e7244387 DC |
660 | archive, |
661 | lock, | |
662 | }) | |
663 | } | |
664 | } | |
665 | ||
666 | impl Iterator for TaskListInfoIterator { | |
667 | type Item = Result<TaskListInfo, Error>; | |
668 | ||
669 | fn next(&mut self) -> Option<Self::Item> { | |
670 | loop { | |
671 | if let Some(element) = self.list.pop_back() { | |
672 | return Some(Ok(element)); | |
264779e7 DC |
673 | } else if self.end { |
674 | return None; | |
e7244387 | 675 | } else { |
264779e7 DC |
676 | if let Some(mut archive) = self.archive.take() { |
677 | if let Some(file) = archive.next() { | |
678 | let list = match read_task_file(file) { | |
679 | Ok(list) => list, | |
e7244387 DC |
680 | Err(err) => return Some(Err(err)), |
681 | }; | |
264779e7 DC |
682 | self.list.append(&mut list.into()); |
683 | self.archive = Some(archive); | |
684 | continue; | |
e7244387 | 685 | } |
e7244387 | 686 | } |
264779e7 DC |
687 | |
688 | self.end = true; | |
689 | self.lock.take(); | |
e7244387 DC |
690 | } |
691 | } | |
692 | } | |
693 | } | |
694 | ||
882594c5 DM |
695 | /// Launch long running worker tasks. |
696 | /// | |
697 | /// A worker task can either be a whole thread, or a simply tokio | |
698 | /// task/future. Each task can `log()` messages, which are stored | |
699 | /// persistently to files. Task should poll the `abort_requested` | |
700 | /// flag, and stop execution when requested. | |
479f6e40 | 701 | pub struct WorkerTask { |
0a33fba4 | 702 | setup: &'static WorkerTaskSetup, |
479f6e40 DM |
703 | upid: UPID, |
704 | data: Mutex<WorkerTaskData>, | |
705 | abort_requested: AtomicBool, | |
706 | } | |
707 | ||
708 | impl std::fmt::Display for WorkerTask { | |
709 | ||
710 | fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { | |
711 | self.upid.fmt(f) | |
712 | } | |
713 | } | |
714 | ||
479f6e40 DM |
715 | struct WorkerTaskData { |
716 | logger: FileLogger, | |
717 | progress: f64, // 0..1 | |
f6de2c73 | 718 | warn_count: u64, |
75bc49be | 719 | pub abort_listeners: Vec<oneshot::Sender<()>>, |
479f6e40 DM |
720 | } |
721 | ||
479f6e40 DM |
722 | impl WorkerTask { |
723 | ||
0a33fba4 DM |
724 | pub fn new( |
725 | worker_type: &str, | |
726 | worker_id: Option<String>, | |
727 | auth_id: String, | |
728 | to_stdout: bool, | |
729 | ) -> Result<Arc<Self>, Error> { | |
730 | ||
731 | let setup = worker_task_setup()?; | |
732 | ||
e6dc35ac | 733 | let upid = UPID::new(worker_type, worker_id, auth_id)?; |
634132fe | 734 | let task_id = upid.task_id; |
479f6e40 | 735 | |
0a33fba4 | 736 | let mut path = setup.taskdir.clone(); |
35950380 | 737 | |
7f3d9100 | 738 | path.push(format!("{:02X}", upid.pstart & 255)); |
479f6e40 | 739 | |
0a33fba4 DM |
740 | let dir_opts = setup.file_opts.clone() |
741 | .perm(nix::sys::stat::Mode::from_bits_truncate(0o755)); | |
35950380 | 742 | |
0a33fba4 | 743 | create_path(&path, None, Some(dir_opts))?; |
479f6e40 DM |
744 | |
745 | path.push(upid.to_string()); | |
746 | ||
c0df91f8 | 747 | let logger_options = FileLogOptions { |
913dddea | 748 | to_stdout, |
c0df91f8 | 749 | exclusive: true, |
e5adbc34 | 750 | prefix_time: true, |
c0df91f8 | 751 | read: true, |
0a33fba4 | 752 | file_opts: setup.file_opts.clone(), |
c0df91f8 TL |
753 | ..Default::default() |
754 | }; | |
755 | let logger = FileLogger::new(&path, logger_options)?; | |
479f6e40 DM |
756 | |
757 | let worker = Arc::new(Self { | |
0a33fba4 | 758 | setup, |
05d755b2 | 759 | upid: upid.clone(), |
479f6e40 DM |
760 | abort_requested: AtomicBool::new(false), |
761 | data: Mutex::new(WorkerTaskData { | |
762 | logger, | |
763 | progress: 0.0, | |
f6de2c73 | 764 | warn_count: 0, |
75bc49be | 765 | abort_listeners: vec![], |
479f6e40 DM |
766 | }), |
767 | }); | |
768 | ||
05d755b2 DC |
769 | // scope to drop the lock again after inserting |
770 | { | |
771 | let mut hash = WORKER_TASK_LIST.lock().unwrap(); | |
772 | hash.insert(task_id, worker.clone()); | |
b9700a9f | 773 | crate::set_worker_count(hash.len()); |
05d755b2 | 774 | } |
7a630df7 | 775 | |
0a33fba4 | 776 | setup.update_active_workers(Some(&upid))?; |
479f6e40 DM |
777 | |
778 | Ok(worker) | |
779 | } | |
780 | ||
882594c5 | 781 | /// Spawn a new tokio task/future. |
660c6846 DM |
782 | pub fn spawn<F, T>( |
783 | worker_type: &str, | |
784 | worker_id: Option<String>, | |
049a22a3 | 785 | auth_id: String, |
660c6846 DM |
786 | to_stdout: bool, |
787 | f: F, | |
788 | ) -> Result<String, Error> | |
479f6e40 | 789 | where F: Send + 'static + FnOnce(Arc<WorkerTask>) -> T, |
75fef4b4 | 790 | T: Send + 'static + Future<Output = Result<(), Error>>, |
479f6e40 | 791 | { |
e6dc35ac | 792 | let worker = WorkerTask::new(worker_type, worker_id, auth_id, to_stdout)?; |
660c6846 | 793 | let upid_str = worker.upid.to_string(); |
75fef4b4 WB |
794 | let f = f(worker.clone()); |
795 | tokio::spawn(async move { | |
796 | let result = f.await; | |
dd8e744f | 797 | worker.log_result(&result); |
75fef4b4 | 798 | }); |
479f6e40 | 799 | |
660c6846 | 800 | Ok(upid_str) |
479f6e40 DM |
801 | } |
802 | ||
882594c5 | 803 | /// Create a new worker thread. |
660c6846 DM |
804 | pub fn new_thread<F>( |
805 | worker_type: &str, | |
806 | worker_id: Option<String>, | |
049a22a3 | 807 | auth_id: String, |
660c6846 DM |
808 | to_stdout: bool, |
809 | f: F, | |
810 | ) -> Result<String, Error> | |
d3f4c08f | 811 | where F: Send + UnwindSafe + 'static + FnOnce(Arc<WorkerTask>) -> Result<(), Error> |
479f6e40 | 812 | { |
e6dc35ac | 813 | let worker = WorkerTask::new(worker_type, worker_id, auth_id, to_stdout)?; |
660c6846 | 814 | let upid_str = worker.upid.to_string(); |
479f6e40 | 815 | |
217170e1 | 816 | let _child = std::thread::Builder::new().name(upid_str.clone()).spawn(move || { |
d3f4c08f DM |
817 | let worker1 = worker.clone(); |
818 | let result = match std::panic::catch_unwind(move || f(worker1)) { | |
819 | Ok(r) => r, | |
820 | Err(panic) => { | |
821 | match panic.downcast::<&str>() { | |
822 | Ok(panic_msg) => { | |
823 | Err(format_err!("worker panicked: {}", panic_msg)) | |
824 | } | |
825 | Err(_) => { | |
826 | Err(format_err!("worker panicked: unknown type.")) | |
827 | } | |
828 | } | |
829 | } | |
830 | }; | |
831 | ||
dd8e744f | 832 | worker.log_result(&result); |
479f6e40 DM |
833 | }); |
834 | ||
660c6846 | 835 | Ok(upid_str) |
479f6e40 DM |
836 | } |
837 | ||
4c116baf DC |
838 | /// create state from self and a result |
839 | pub fn create_state(&self, result: &Result<(), Error>) -> TaskState { | |
f6de2c73 | 840 | let warn_count = self.data.lock().unwrap().warn_count; |
cef03f41 | 841 | |
6ef1b649 | 842 | let endtime = proxmox_time::epoch_i64(); |
77bd2a46 | 843 | |
4b01c983 | 844 | if let Err(err) = result { |
77bd2a46 | 845 | TaskState::Error { message: err.to_string(), endtime } |
f6de2c73 | 846 | } else if warn_count > 0 { |
77bd2a46 | 847 | TaskState::Warning { count: warn_count, endtime } |
4b01c983 | 848 | } else { |
77bd2a46 | 849 | TaskState::OK { endtime } |
4b01c983 | 850 | } |
cef03f41 DC |
851 | } |
852 | ||
853 | /// Log task result, remove task from running list | |
854 | pub fn log_result(&self, result: &Result<(), Error>) { | |
4c116baf | 855 | let state = self.create_state(result); |
1ec0d70d | 856 | self.log_message(state.result_text()); |
418def7a DM |
857 | |
858 | WORKER_TASK_LIST.lock().unwrap().remove(&self.upid.task_id); | |
0a33fba4 | 859 | let _ = self.setup.update_active_workers(None); |
b9700a9f | 860 | crate::set_worker_count(WORKER_TASK_LIST.lock().unwrap().len()); |
4b01c983 DM |
861 | } |
862 | ||
882594c5 | 863 | /// Log a message. |
1ec0d70d | 864 | pub fn log_message<S: AsRef<str>>(&self, msg: S) { |
479f6e40 DM |
865 | let mut data = self.data.lock().unwrap(); |
866 | data.logger.log(msg); | |
867 | } | |
868 | ||
f6de2c73 | 869 | /// Log a message as warning. |
1ec0d70d | 870 | pub fn log_warning<S: AsRef<str>>(&self, msg: S) { |
f6de2c73 DC |
871 | let mut data = self.data.lock().unwrap(); |
872 | data.logger.log(format!("WARN: {}", msg.as_ref())); | |
873 | data.warn_count += 1; | |
874 | } | |
875 | ||
882594c5 | 876 | /// Set progress indicator |
479f6e40 DM |
877 | pub fn progress(&self, progress: f64) { |
878 | if progress >= 0.0 && progress <= 1.0 { | |
879 | let mut data = self.data.lock().unwrap(); | |
880 | data.progress = progress; | |
881 | } else { | |
882 | // fixme: log!("task '{}': ignoring strange value for progress '{}'", self.upid, progress); | |
883 | } | |
884 | } | |
885 | ||
882594c5 | 886 | /// Request abort |
d607b886 | 887 | pub fn request_abort(&self) { |
a6c16894 DM |
888 | let prev_abort = self.abort_requested.swap(true, Ordering::SeqCst); |
889 | if !prev_abort { // log abort one time | |
1ec0d70d | 890 | self.log_message(format!("received abort request ...")); |
a6c16894 | 891 | } |
75bc49be DM |
892 | // noitify listeners |
893 | let mut data = self.data.lock().unwrap(); | |
894 | loop { | |
895 | match data.abort_listeners.pop() { | |
896 | None => { break; }, | |
897 | Some(ch) => { | |
d1d74c43 | 898 | let _ = ch.send(()); // ignore errors here |
75bc49be DM |
899 | }, |
900 | } | |
901 | } | |
479f6e40 DM |
902 | } |
903 | ||
75bc49be DM |
904 | /// Get a future which resolves on task abort |
905 | pub fn abort_future(&self) -> oneshot::Receiver<()> { | |
906 | let (tx, rx) = oneshot::channel::<()>(); | |
907 | ||
908 | let mut data = self.data.lock().unwrap(); | |
909 | if self.abort_requested() { | |
910 | let _ = tx.send(()); | |
911 | } else { | |
912 | data.abort_listeners.push(tx); | |
913 | } | |
914 | rx | |
915 | } | |
4bd2a9e4 DC |
916 | |
917 | pub fn upid(&self) -> &UPID { | |
918 | &self.upid | |
919 | } | |
479f6e40 | 920 | } |
d1993187 | 921 | |
c8449217 | 922 | impl WorkerTaskContext for WorkerTask { |
619cd5cb DM |
923 | |
924 | fn abort_requested(&self) -> bool { | |
925 | self.abort_requested.load(Ordering::SeqCst) | |
d1993187 WB |
926 | } |
927 | ||
0fd55b08 DM |
928 | fn shutdown_requested(&self) -> bool { |
929 | crate::shutdown_requested() | |
930 | } | |
931 | ||
932 | fn fail_on_shutdown(&self) -> Result<(), Error> { | |
933 | crate::fail_on_shutdown() | |
934 | } | |
935 | ||
d1993187 WB |
936 | fn log(&self, level: log::Level, message: &std::fmt::Arguments) { |
937 | match level { | |
1ec0d70d DM |
938 | log::Level::Error => self.log_warning(&message.to_string()), |
939 | log::Level::Warn => self.log_warning(&message.to_string()), | |
940 | log::Level::Info => self.log_message(&message.to_string()), | |
941 | log::Level::Debug => self.log_message(&format!("DEBUG: {}", message)), | |
942 | log::Level::Trace => self.log_message(&format!("TRACE: {}", message)), | |
d1993187 WB |
943 | } |
944 | } | |
945 | } | |
72fbe9ff WB |
946 | |
947 | /// Wait for a locally spanned worker task | |
948 | /// | |
949 | /// Note: local workers should print logs to stdout, so there is no | |
950 | /// need to fetch/display logs. We just wait for the worker to finish. | |
951 | pub async fn wait_for_local_worker(upid_str: &str) -> Result<(), Error> { | |
952 | ||
953 | let upid: UPID = upid_str.parse()?; | |
954 | ||
955 | let sleep_duration = core::time::Duration::new(0, 100_000_000); | |
956 | ||
957 | loop { | |
958 | if worker_is_active_local(&upid) { | |
959 | tokio::time::sleep(sleep_duration).await; | |
960 | } else { | |
961 | break; | |
962 | } | |
963 | } | |
964 | Ok(()) | |
965 | } | |
3f742f95 DC |
966 | |
967 | /// Request abort of a local worker (if existing and running) | |
968 | pub fn abort_local_worker(upid: UPID) { | |
969 | if let Some(ref worker) = WORKER_TASK_LIST.lock().unwrap().get(&upid.task_id) { | |
970 | worker.request_abort(); | |
971 | } | |
972 | } |