]>
Commit | Line | Data |
---|---|---|
e8c124fe DM |
1 | use std::collections::{HashMap, VecDeque}; |
2 | use std::fs::File; | |
2bda552b | 3 | use std::io::{BufRead, BufReader, Read, Write}; |
e8c124fe | 4 | use std::panic::UnwindSafe; |
2bda552b | 5 | use std::path::PathBuf; |
e8c124fe DM |
6 | use std::sync::atomic::{AtomicBool, Ordering}; |
7 | use std::sync::{Arc, Mutex}; | |
2bda552b | 8 | use std::time::{Duration, SystemTime}; |
e8c124fe DM |
9 | |
10 | use anyhow::{bail, format_err, Error}; | |
11 | use futures::*; | |
12 | use lazy_static::lazy_static; | |
e8c124fe DM |
13 | use nix::fcntl::OFlag; |
14 | use once_cell::sync::OnceCell; | |
2bda552b TL |
15 | use serde::{Deserialize, Serialize}; |
16 | use serde_json::{json, Value}; | |
17 | use tokio::sync::oneshot; | |
e8c124fe | 18 | |
09046671 WB |
19 | use proxmox_lang::try_block; |
20 | use proxmox_schema::upid::UPID; | |
2bda552b TL |
21 | use proxmox_sys::fs::{atomic_open_or_create_file, create_path, replace_file, CreateOptions}; |
22 | use proxmox_sys::linux::procfs; | |
6c856eed | 23 | use proxmox_sys::task_warn; |
e8c124fe | 24 | |
ef69d1ae | 25 | use proxmox_sys::logrotate::{LogRotate, LogRotateFiles}; |
2bda552b | 26 | use proxmox_sys::WorkerTaskContext; |
e8c124fe | 27 | |
2bda552b | 28 | use crate::{CommandSocket, FileLogOptions, FileLogger}; |
e8c124fe DM |
29 | |
30 | struct TaskListLockGuard(File); | |
31 | ||
32 | struct WorkerTaskSetup { | |
33 | file_opts: CreateOptions, | |
34 | taskdir: PathBuf, | |
35 | task_lock_fn: PathBuf, | |
36 | active_tasks_fn: PathBuf, | |
37 | task_index_fn: PathBuf, | |
38 | task_archive_fn: PathBuf, | |
39 | } | |
40 | ||
41 | static WORKER_TASK_SETUP: OnceCell<WorkerTaskSetup> = OnceCell::new(); | |
42 | ||
43 | fn worker_task_setup() -> Result<&'static WorkerTaskSetup, Error> { | |
2bda552b TL |
44 | WORKER_TASK_SETUP |
45 | .get() | |
e8c124fe DM |
46 | .ok_or_else(|| format_err!("WorkerTask library is not initialized")) |
47 | } | |
48 | ||
49 | impl WorkerTaskSetup { | |
e8c124fe | 50 | fn new(basedir: PathBuf, file_opts: CreateOptions) -> Self { |
6ad9248c | 51 | let mut taskdir = basedir; |
e8c124fe DM |
52 | taskdir.push("tasks"); |
53 | ||
54 | let mut task_lock_fn = taskdir.clone(); | |
55 | task_lock_fn.push(".active.lock"); | |
56 | ||
57 | let mut active_tasks_fn = taskdir.clone(); | |
58 | active_tasks_fn.push("active"); | |
59 | ||
60 | let mut task_index_fn = taskdir.clone(); | |
61 | task_index_fn.push("index"); | |
62 | ||
63 | let mut task_archive_fn = taskdir.clone(); | |
64 | task_archive_fn.push("archive"); | |
65 | ||
66 | Self { | |
67 | file_opts, | |
68 | taskdir, | |
69 | task_lock_fn, | |
70 | active_tasks_fn, | |
71 | task_index_fn, | |
72 | task_archive_fn, | |
73 | } | |
74 | } | |
75 | ||
76 | fn lock_task_list_files(&self, exclusive: bool) -> Result<TaskListLockGuard, Error> { | |
2bda552b TL |
77 | let options = self |
78 | .file_opts | |
79 | .clone() | |
e8c124fe DM |
80 | .perm(nix::sys::stat::Mode::from_bits_truncate(0o660)); |
81 | ||
82 | let timeout = std::time::Duration::new(10, 0); | |
83 | ||
2bda552b TL |
84 | let file = |
85 | proxmox_sys::fs::open_file_locked(&self.task_lock_fn, timeout, exclusive, options)?; | |
e8c124fe DM |
86 | |
87 | Ok(TaskListLockGuard(file)) | |
88 | } | |
89 | ||
90 | fn log_path(&self, upid: &UPID) -> std::path::PathBuf { | |
91 | let mut path = self.taskdir.clone(); | |
92 | path.push(format!("{:02X}", upid.pstart % 256)); | |
93 | path.push(upid.to_string()); | |
94 | path | |
95 | } | |
96 | ||
97 | // atomically read/update the task list, update status of finished tasks | |
98 | // new_upid is added to the list when specified. | |
99 | fn update_active_workers(&self, new_upid: Option<&UPID>) -> Result<(), Error> { | |
e8c124fe DM |
100 | let lock = self.lock_task_list_files(true)?; |
101 | ||
102 | // TODO remove with 1.x | |
103 | let mut finish_list: Vec<TaskListInfo> = read_task_file_from_path(&self.task_index_fn)?; | |
104 | let had_index_file = !finish_list.is_empty(); | |
105 | ||
106 | // We use filter_map because one negative case wants to *move* the data into `finish_list`, | |
107 | // clippy doesn't quite catch this! | |
108 | #[allow(clippy::unnecessary_filter_map)] | |
109 | let mut active_list: Vec<TaskListInfo> = read_task_file_from_path(&self.active_tasks_fn)? | |
110 | .into_iter() | |
111 | .filter_map(|info| { | |
112 | if info.state.is_some() { | |
113 | // this can happen when the active file still includes finished tasks | |
114 | finish_list.push(info); | |
115 | return None; | |
116 | } | |
117 | ||
118 | if !worker_is_active_local(&info.upid) { | |
119 | // println!("Detected stopped task '{}'", &info.upid_str); | |
09046671 | 120 | let now = proxmox_time::epoch_i64(); |
2bda552b TL |
121 | let status = |
122 | upid_read_status(&info.upid).unwrap_or(TaskState::Unknown { endtime: now }); | |
e8c124fe DM |
123 | finish_list.push(TaskListInfo { |
124 | upid: info.upid, | |
125 | upid_str: info.upid_str, | |
2bda552b | 126 | state: Some(status), |
e8c124fe DM |
127 | }); |
128 | return None; | |
129 | } | |
130 | ||
131 | Some(info) | |
2bda552b TL |
132 | }) |
133 | .collect(); | |
e8c124fe DM |
134 | |
135 | if let Some(upid) = new_upid { | |
2bda552b TL |
136 | active_list.push(TaskListInfo { |
137 | upid: upid.clone(), | |
138 | upid_str: upid.to_string(), | |
139 | state: None, | |
140 | }); | |
e8c124fe DM |
141 | } |
142 | ||
143 | let active_raw = render_task_list(&active_list); | |
144 | ||
2bda552b TL |
145 | let options = self |
146 | .file_opts | |
147 | .clone() | |
e8c124fe DM |
148 | .perm(nix::sys::stat::Mode::from_bits_truncate(0o660)); |
149 | ||
2bda552b TL |
150 | replace_file(&self.active_tasks_fn, active_raw.as_bytes(), options, false)?; |
151 | ||
152 | finish_list.sort_unstable_by(|a, b| match (&a.state, &b.state) { | |
153 | (Some(s1), Some(s2)) => s1.cmp(s2), | |
154 | (Some(_), None) => std::cmp::Ordering::Less, | |
155 | (None, Some(_)) => std::cmp::Ordering::Greater, | |
156 | _ => a.upid.starttime.cmp(&b.upid.starttime), | |
e8c124fe DM |
157 | }); |
158 | ||
159 | if !finish_list.is_empty() { | |
2bda552b TL |
160 | let options = self |
161 | .file_opts | |
162 | .clone() | |
e8c124fe DM |
163 | .perm(nix::sys::stat::Mode::from_bits_truncate(0o660)); |
164 | ||
165 | let mut writer = atomic_open_or_create_file( | |
166 | &self.task_archive_fn, | |
167 | OFlag::O_APPEND | OFlag::O_RDWR, | |
168 | &[], | |
169 | options, | |
e9bea7b7 | 170 | false, |
e8c124fe DM |
171 | )?; |
172 | for info in &finish_list { | |
647a0db8 | 173 | writer.write_all(render_task_line(info).as_bytes())?; |
e8c124fe DM |
174 | } |
175 | } | |
176 | ||
177 | // TODO Remove with 1.x | |
178 | // for compatibility, if we had an INDEX file, we do not need it anymore | |
179 | if had_index_file { | |
180 | let _ = nix::unistd::unlink(&self.task_index_fn); | |
181 | } | |
182 | ||
183 | drop(lock); | |
184 | ||
185 | Ok(()) | |
186 | } | |
187 | ||
188 | // Create task log directory with correct permissions | |
189 | fn create_task_log_dirs(&self) -> Result<(), Error> { | |
e8c124fe | 190 | try_block!({ |
2bda552b TL |
191 | let dir_opts = self |
192 | .file_opts | |
193 | .clone() | |
e8c124fe DM |
194 | .perm(nix::sys::stat::Mode::from_bits_truncate(0o755)); |
195 | ||
6ad9248c | 196 | create_path(&self.taskdir, Some(dir_opts.clone()), Some(dir_opts))?; |
e8c124fe DM |
197 | // fixme:??? create_path(pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR, None, Some(opts))?; |
198 | Ok(()) | |
2bda552b TL |
199 | }) |
200 | .map_err(|err: Error| format_err!("unable to create task log dir - {}", err)) | |
e8c124fe DM |
201 | } |
202 | } | |
203 | ||
204 | /// Initialize the WorkerTask library | |
205 | pub fn init_worker_tasks(basedir: PathBuf, file_opts: CreateOptions) -> Result<(), Error> { | |
206 | let setup = WorkerTaskSetup::new(basedir, file_opts); | |
207 | setup.create_task_log_dirs()?; | |
2bda552b TL |
208 | WORKER_TASK_SETUP |
209 | .set(setup) | |
e8c124fe DM |
210 | .map_err(|_| format_err!("init_worker_tasks failed - already initialized")) |
211 | } | |
212 | ||
213 | /// checks if the Task Archive is bigger that 'size_threshold' bytes, and | |
bd8fd62d DC |
214 | /// rotates it if it is, keeps only up to 'max_files'. |
215 | /// If 'max_days' is given, 'max_files' is ignored, and all archive files | |
216 | /// will be deleted where there are only tasks that are older than the given days. | |
ef69d1ae DM |
217 | pub fn rotate_task_log_archive( |
218 | size_threshold: u64, | |
219 | compress: bool, | |
220 | max_files: Option<usize>, | |
bd8fd62d | 221 | max_days: Option<usize>, |
ef69d1ae DM |
222 | options: Option<CreateOptions>, |
223 | ) -> Result<bool, Error> { | |
e8c124fe DM |
224 | let setup = worker_task_setup()?; |
225 | ||
226 | let _lock = setup.lock_task_list_files(true)?; | |
227 | ||
bd8fd62d DC |
228 | let mut logrotate = LogRotate::new( |
229 | &setup.task_archive_fn, | |
230 | compress, | |
231 | if max_days.is_none() { max_files } else { None }, | |
232 | options, | |
233 | )?; | |
234 | ||
235 | let mut rotated = logrotate.rotate(size_threshold)?; | |
236 | ||
237 | if let Some(max_days) = max_days { | |
238 | let mut delete = false; | |
239 | let file_names = logrotate.file_names(); | |
240 | let mut files = logrotate.files(); | |
5eaab786 TL |
241 | // task archives have task-logs sorted by endtime, with the oldest at the start of the |
242 | // file. So, peak into every archive and see if the first listed tasks' endtime would be | |
243 | // cut off. If that's the case we know that the next (older) task archive rotation surely | |
244 | // falls behind the cut-off line. We cannot say the same for the current archive without | |
245 | // checking its last (newest) line, but that's more complex, expensive and rather unlikely. | |
bd8fd62d DC |
246 | for file_name in file_names { |
247 | if !delete { | |
248 | // we only have to check if we did not start deleting already | |
249 | ||
250 | // this is ok because the task log files are locked, so no one | |
251 | // else should modify these | |
252 | let reader = match files.next() { | |
253 | Some(file) => BufReader::new(file), | |
254 | None => { | |
255 | bail!("unexpected error: files do not match file_names"); | |
256 | } | |
257 | }; | |
258 | if let Some(line) = reader.lines().next() { | |
259 | if let Ok((_, _, Some(state))) = parse_worker_status_line(&line?) { | |
260 | // we approximate here with the days, but should be close enough | |
261 | let cutoff_time = | |
262 | proxmox_time::epoch_i64() - (max_days * 24 * 60 * 60) as i64; | |
263 | if state.endtime() < cutoff_time { | |
264 | // we found the first file that has only older entries, start deleting | |
265 | delete = true; | |
266 | rotated = true; | |
267 | } | |
268 | } | |
269 | } | |
5eaab786 | 270 | } else { |
bd8fd62d DC |
271 | if let Err(err) = std::fs::remove_file(&file_name) { |
272 | log::error!("could not remove {:?}: {}", file_name, err); | |
273 | } | |
274 | } | |
275 | } | |
276 | } | |
e8c124fe | 277 | |
bd8fd62d | 278 | Ok(rotated) |
e8c124fe DM |
279 | } |
280 | ||
fb6823b5 DC |
281 | /// removes all task logs that are older than the oldest task entry in the |
282 | /// task archive | |
6c856eed | 283 | pub fn cleanup_old_tasks(worker: &dyn WorkerTaskContext, compressed: bool) -> Result<(), Error> { |
fb6823b5 DC |
284 | let setup = worker_task_setup()?; |
285 | ||
286 | let _lock = setup.lock_task_list_files(true)?; | |
287 | ||
2bda552b | 288 | let logrotate = LogRotate::new(&setup.task_archive_fn, compressed, None, None)?; |
fb6823b5 DC |
289 | |
290 | let mut timestamp = None; | |
291 | if let Some(last_file) = logrotate.files().last() { | |
292 | let reader = BufReader::new(last_file); | |
293 | for line in reader.lines() { | |
294 | let line = line?; | |
295 | if let Ok((_, _, Some(state))) = parse_worker_status_line(&line) { | |
296 | timestamp = Some(state.endtime()); | |
297 | break; | |
298 | } | |
299 | } | |
300 | } | |
301 | ||
302 | fn get_modified(entry: std::fs::DirEntry) -> Result<SystemTime, std::io::Error> { | |
303 | entry.metadata()?.modified() | |
304 | } | |
305 | ||
306 | if let Some(timestamp) = timestamp { | |
307 | let cutoff_time = if timestamp > 0 { | |
308 | SystemTime::UNIX_EPOCH.checked_add(Duration::from_secs(timestamp as u64)) | |
309 | } else { | |
310 | SystemTime::UNIX_EPOCH.checked_sub(Duration::from_secs(-timestamp as u64)) | |
2bda552b TL |
311 | } |
312 | .ok_or_else(|| format_err!("could not calculate cutoff time"))?; | |
fb6823b5 DC |
313 | |
314 | for i in 0..256 { | |
315 | let mut path = setup.taskdir.clone(); | |
316 | path.push(format!("{:02X}", i)); | |
6c856eed DC |
317 | let files = match std::fs::read_dir(path) { |
318 | Ok(files) => files, | |
319 | Err(err) if err.kind() == std::io::ErrorKind::NotFound => continue, | |
320 | Err(err) => { | |
321 | task_warn!(worker, "could not check task logs in '{:02X}': {}", i, err); | |
322 | continue; | |
323 | } | |
324 | }; | |
325 | for file in files { | |
326 | let file = match file { | |
327 | Ok(file) => file, | |
328 | Err(err) => { | |
329 | task_warn!( | |
330 | worker, | |
331 | "could not check some task log in '{:02X}': {}", | |
332 | i, | |
333 | err | |
334 | ); | |
335 | continue; | |
336 | } | |
337 | }; | |
fb6823b5 DC |
338 | let path = file.path(); |
339 | ||
6c856eed DC |
340 | let modified = match get_modified(file) { |
341 | Ok(modified) => modified, | |
342 | Err(err) => { | |
343 | task_warn!(worker, "error getting mtime for '{:?}': {}", path, err); | |
344 | continue; | |
345 | } | |
346 | }; | |
fb6823b5 DC |
347 | |
348 | if modified < cutoff_time { | |
6c856eed | 349 | match std::fs::remove_file(&path) { |
2bda552b TL |
350 | Ok(()) => {} |
351 | Err(err) if err.kind() == std::io::ErrorKind::NotFound => {} | |
6c856eed DC |
352 | Err(err) => { |
353 | task_warn!(worker, "could not remove file '{:?}': {}", path, err) | |
354 | } | |
fb6823b5 DC |
355 | } |
356 | } | |
357 | } | |
358 | } | |
359 | } | |
360 | ||
361 | Ok(()) | |
362 | } | |
363 | ||
e8c124fe DM |
364 | /// Path to the worker log file |
365 | pub fn upid_log_path(upid: &UPID) -> Result<std::path::PathBuf, Error> { | |
366 | let setup = worker_task_setup()?; | |
367 | Ok(setup.log_path(upid)) | |
368 | } | |
369 | ||
370 | /// Read endtime (time of last log line) and exitstatus from task log file | |
371 | /// If there is not a single line with at valid datetime, we assume the | |
372 | /// starttime to be the endtime | |
373 | pub fn upid_read_status(upid: &UPID) -> Result<TaskState, Error> { | |
e8c124fe DM |
374 | let setup = worker_task_setup()?; |
375 | ||
2bda552b TL |
376 | let mut status = TaskState::Unknown { |
377 | endtime: upid.starttime, | |
378 | }; | |
e8c124fe DM |
379 | |
380 | let path = setup.log_path(upid); | |
381 | ||
382 | let mut file = File::open(path)?; | |
383 | ||
384 | /// speedup - only read tail | |
385 | use std::io::Seek; | |
386 | use std::io::SeekFrom; | |
387 | let _ = file.seek(SeekFrom::End(-8192)); // ignore errors | |
388 | ||
389 | let mut data = Vec::with_capacity(8192); | |
390 | file.read_to_end(&mut data)?; | |
391 | ||
392 | // strip newlines at the end of the task logs | |
393 | while data.last() == Some(&b'\n') { | |
394 | data.pop(); | |
395 | } | |
396 | ||
397 | let last_line = match data.iter().rposition(|c| *c == b'\n') { | |
2bda552b | 398 | Some(start) if data.len() > (start + 1) => &data[start + 1..], |
e8c124fe DM |
399 | Some(_) => &data, // should not happen, since we removed all trailing newlines |
400 | None => &data, | |
401 | }; | |
402 | ||
403 | let last_line = std::str::from_utf8(last_line) | |
404 | .map_err(|err| format_err!("upid_read_status: utf8 parse failed: {}", err))?; | |
405 | ||
406 | let mut iter = last_line.splitn(2, ": "); | |
407 | if let Some(time_str) = iter.next() { | |
09046671 | 408 | if let Ok(endtime) = proxmox_time::parse_rfc3339(time_str) { |
e8c124fe DM |
409 | // set the endtime even if we cannot parse the state |
410 | status = TaskState::Unknown { endtime }; | |
411 | if let Some(rest) = iter.next().and_then(|rest| rest.strip_prefix("TASK ")) { | |
412 | if let Ok(state) = TaskState::from_endtime_and_message(endtime, rest) { | |
413 | status = state; | |
414 | } | |
415 | } | |
416 | } | |
417 | } | |
418 | ||
419 | Ok(status) | |
420 | } | |
421 | ||
422 | lazy_static! { | |
2bda552b TL |
423 | static ref WORKER_TASK_LIST: Mutex<HashMap<usize, Arc<WorkerTask>>> = |
424 | Mutex::new(HashMap::new()); | |
e8c124fe DM |
425 | } |
426 | ||
427 | /// checks if the task UPID refers to a worker from this process | |
428 | fn is_local_worker(upid: &UPID) -> bool { | |
429 | upid.pid == crate::pid() && upid.pstart == crate::pstart() | |
430 | } | |
431 | ||
432 | /// Test if the task is still running | |
433 | pub async fn worker_is_active(upid: &UPID) -> Result<bool, Error> { | |
434 | if is_local_worker(upid) { | |
435 | return Ok(WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id)); | |
436 | } | |
437 | ||
438 | if procfs::check_process_running_pstart(upid.pid, upid.pstart).is_none() { | |
439 | return Ok(false); | |
440 | } | |
441 | ||
442 | let sock = crate::ctrl_sock_from_pid(upid.pid); | |
443 | let cmd = json!({ | |
444 | "command": "worker-task-status", | |
445 | "args": { | |
446 | "upid": upid.to_string(), | |
447 | }, | |
448 | }); | |
449 | let status = crate::send_command(sock, &cmd).await?; | |
450 | ||
451 | if let Some(active) = status.as_bool() { | |
452 | Ok(active) | |
453 | } else { | |
454 | bail!("got unexpected result {:?} (expected bool)", status); | |
455 | } | |
456 | } | |
457 | ||
458 | /// Test if the task is still running (fast but inaccurate implementation) | |
459 | /// | |
460 | /// If the task is spawned from a different process, we simply return if | |
461 | /// that process is still running. This information is good enough to detect | |
462 | /// stale tasks... | |
463 | pub fn worker_is_active_local(upid: &UPID) -> bool { | |
464 | if is_local_worker(upid) { | |
465 | WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id) | |
466 | } else { | |
467 | procfs::check_process_running_pstart(upid.pid, upid.pstart).is_some() | |
468 | } | |
469 | } | |
470 | ||
ad449a57 | 471 | /// Register task control command on a [CommandSocket]. |
9cb2c97c DM |
472 | /// |
473 | /// This create two commands: | |
474 | /// | |
475 | /// * ``worker-task-abort <UPID>``: calls [abort_local_worker] | |
476 | /// | |
477 | /// * ``worker-task-status <UPID>``: return true of false, depending on | |
478 | /// whether the worker is running or stopped. | |
2bda552b | 479 | pub fn register_task_control_commands(commando_sock: &mut CommandSocket) -> Result<(), Error> { |
e8c124fe | 480 | fn get_upid(args: Option<&Value>) -> Result<UPID, Error> { |
2bda552b TL |
481 | let args = if let Some(args) = args { |
482 | args | |
483 | } else { | |
484 | bail!("missing args") | |
485 | }; | |
e8c124fe DM |
486 | let upid = match args.get("upid") { |
487 | Some(Value::String(upid)) => upid.parse::<UPID>()?, | |
488 | None => bail!("no upid in args"), | |
489 | _ => bail!("unable to parse upid"), | |
490 | }; | |
491 | if !is_local_worker(&upid) { | |
492 | bail!("upid does not belong to this process"); | |
493 | } | |
494 | Ok(upid) | |
495 | } | |
496 | ||
497 | commando_sock.register_command("worker-task-abort".into(), move |args| { | |
498 | let upid = get_upid(args)?; | |
499 | ||
500 | abort_local_worker(upid); | |
501 | ||
502 | Ok(Value::Null) | |
503 | })?; | |
504 | commando_sock.register_command("worker-task-status".into(), move |args| { | |
505 | let upid = get_upid(args)?; | |
506 | ||
507 | let active = WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id); | |
508 | ||
509 | Ok(active.into()) | |
510 | })?; | |
511 | ||
512 | Ok(()) | |
513 | } | |
514 | ||
9cb2c97c DM |
515 | /// Try to abort a worker task, but do no wait |
516 | /// | |
517 | /// Errors (if any) are simply logged. | |
518 | pub fn abort_worker_nowait(upid: UPID) { | |
e8c124fe DM |
519 | tokio::spawn(async move { |
520 | if let Err(err) = abort_worker(upid).await { | |
9cb2c97c | 521 | log::error!("abort worker task failed - {}", err); |
e8c124fe DM |
522 | } |
523 | }); | |
524 | } | |
525 | ||
9cb2c97c DM |
526 | /// Abort a worker task |
527 | /// | |
528 | /// By sending ``worker-task-abort`` to the control socket. | |
e8c124fe | 529 | pub async fn abort_worker(upid: UPID) -> Result<(), Error> { |
e8c124fe DM |
530 | let sock = crate::ctrl_sock_from_pid(upid.pid); |
531 | let cmd = json!({ | |
532 | "command": "worker-task-abort", | |
533 | "args": { | |
534 | "upid": upid.to_string(), | |
535 | }, | |
536 | }); | |
537 | crate::send_command(sock, &cmd).map_ok(|_| ()).await | |
538 | } | |
539 | ||
540 | fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<TaskState>), Error> { | |
e8c124fe DM |
541 | let data = line.splitn(3, ' ').collect::<Vec<&str>>(); |
542 | ||
543 | let len = data.len(); | |
544 | ||
545 | match len { | |
546 | 1 => Ok((data[0].to_owned(), data[0].parse::<UPID>()?, None)), | |
547 | 3 => { | |
548 | let endtime = i64::from_str_radix(data[1], 16)?; | |
549 | let state = TaskState::from_endtime_and_message(endtime, data[2])?; | |
550 | Ok((data[0].to_owned(), data[0].parse::<UPID>()?, Some(state))) | |
551 | } | |
552 | _ => bail!("wrong number of components"), | |
553 | } | |
554 | } | |
555 | ||
556 | /// Task State | |
557 | #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] | |
558 | pub enum TaskState { | |
559 | /// The Task ended with an undefined state | |
560 | Unknown { endtime: i64 }, | |
561 | /// The Task ended and there were no errors or warnings | |
562 | OK { endtime: i64 }, | |
563 | /// The Task had 'count' amount of warnings and no errors | |
564 | Warning { count: u64, endtime: i64 }, | |
565 | /// The Task ended with the error described in 'message' | |
566 | Error { message: String, endtime: i64 }, | |
567 | } | |
568 | ||
569 | impl TaskState { | |
570 | pub fn endtime(&self) -> i64 { | |
571 | match *self { | |
572 | TaskState::Unknown { endtime } => endtime, | |
573 | TaskState::OK { endtime } => endtime, | |
574 | TaskState::Warning { endtime, .. } => endtime, | |
575 | TaskState::Error { endtime, .. } => endtime, | |
576 | } | |
577 | } | |
578 | ||
579 | fn result_text(&self) -> String { | |
580 | match self { | |
581 | TaskState::Error { message, .. } => format!("TASK ERROR: {}", message), | |
582 | other => format!("TASK {}", other), | |
583 | } | |
584 | } | |
585 | ||
586 | fn from_endtime_and_message(endtime: i64, s: &str) -> Result<Self, Error> { | |
587 | if s == "unknown" { | |
588 | Ok(TaskState::Unknown { endtime }) | |
589 | } else if s == "OK" { | |
590 | Ok(TaskState::OK { endtime }) | |
591 | } else if let Some(warnings) = s.strip_prefix("WARNINGS: ") { | |
592 | let count: u64 = warnings.parse()?; | |
2bda552b | 593 | Ok(TaskState::Warning { count, endtime }) |
e8c124fe | 594 | } else if !s.is_empty() { |
2bda552b TL |
595 | let message = if let Some(err) = s.strip_prefix("ERROR: ") { |
596 | err | |
597 | } else { | |
598 | s | |
599 | } | |
600 | .to_string(); | |
601 | Ok(TaskState::Error { message, endtime }) | |
e8c124fe DM |
602 | } else { |
603 | bail!("unable to parse Task Status '{}'", s); | |
604 | } | |
605 | } | |
606 | } | |
607 | ||
608 | impl std::cmp::PartialOrd for TaskState { | |
609 | fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { | |
610 | Some(self.endtime().cmp(&other.endtime())) | |
611 | } | |
612 | } | |
613 | ||
614 | impl std::cmp::Ord for TaskState { | |
615 | fn cmp(&self, other: &Self) -> std::cmp::Ordering { | |
616 | self.endtime().cmp(&other.endtime()) | |
617 | } | |
618 | } | |
619 | ||
620 | impl std::fmt::Display for TaskState { | |
621 | fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | |
622 | match self { | |
623 | TaskState::Unknown { .. } => write!(f, "unknown"), | |
2bda552b | 624 | TaskState::OK { .. } => write!(f, "OK"), |
e8c124fe DM |
625 | TaskState::Warning { count, .. } => write!(f, "WARNINGS: {}", count), |
626 | TaskState::Error { message, .. } => write!(f, "{}", message), | |
627 | } | |
628 | } | |
629 | } | |
630 | ||
631 | /// Task details including parsed UPID | |
632 | /// | |
633 | /// If there is no `state`, the task is still running. | |
634 | #[derive(Debug)] | |
635 | pub struct TaskListInfo { | |
636 | /// The parsed UPID | |
637 | pub upid: UPID, | |
638 | /// UPID string representation | |
639 | pub upid_str: String, | |
640 | /// Task `(endtime, status)` if already finished | |
641 | pub state: Option<TaskState>, // endtime, status | |
642 | } | |
643 | ||
644 | fn render_task_line(info: &TaskListInfo) -> String { | |
645 | let mut raw = String::new(); | |
646 | if let Some(status) = &info.state { | |
2bda552b TL |
647 | raw.push_str(&format!( |
648 | "{} {:08X} {}\n", | |
649 | info.upid_str, | |
650 | status.endtime(), | |
651 | status | |
652 | )); | |
e8c124fe DM |
653 | } else { |
654 | raw.push_str(&info.upid_str); | |
655 | raw.push('\n'); | |
656 | } | |
657 | ||
658 | raw | |
659 | } | |
660 | ||
661 | fn render_task_list(list: &[TaskListInfo]) -> String { | |
662 | let mut raw = String::new(); | |
663 | for info in list { | |
647a0db8 | 664 | raw.push_str(&render_task_line(info)); |
e8c124fe DM |
665 | } |
666 | raw | |
667 | } | |
668 | ||
669 | // note this is not locked, caller has to make sure it is | |
670 | // this will skip (and log) lines that are not valid status lines | |
2bda552b | 671 | fn read_task_file<R: Read>(reader: R) -> Result<Vec<TaskListInfo>, Error> { |
e8c124fe DM |
672 | let reader = BufReader::new(reader); |
673 | let mut list = Vec::new(); | |
674 | for line in reader.lines() { | |
675 | let line = line?; | |
676 | match parse_worker_status_line(&line) { | |
677 | Ok((upid_str, upid, state)) => list.push(TaskListInfo { | |
678 | upid_str, | |
679 | upid, | |
2bda552b | 680 | state, |
e8c124fe DM |
681 | }), |
682 | Err(err) => { | |
9cb2c97c | 683 | log::warn!("unable to parse worker status '{}' - {}", line, err); |
e8c124fe DM |
684 | continue; |
685 | } | |
686 | }; | |
687 | } | |
688 | ||
689 | Ok(list) | |
690 | } | |
691 | ||
692 | // note this is not locked, caller has to make sure it is | |
693 | fn read_task_file_from_path<P>(path: P) -> Result<Vec<TaskListInfo>, Error> | |
694 | where | |
695 | P: AsRef<std::path::Path> + std::fmt::Debug, | |
696 | { | |
697 | let file = match File::open(&path) { | |
698 | Ok(f) => f, | |
699 | Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()), | |
700 | Err(err) => bail!("unable to open task list {:?} - {}", path, err), | |
701 | }; | |
702 | ||
703 | read_task_file(file) | |
704 | } | |
705 | ||
9cb2c97c | 706 | /// Iterate over existing/active worker tasks |
e8c124fe DM |
707 | pub struct TaskListInfoIterator { |
708 | list: VecDeque<TaskListInfo>, | |
709 | end: bool, | |
710 | archive: Option<LogRotateFiles>, | |
711 | lock: Option<TaskListLockGuard>, | |
712 | } | |
713 | ||
714 | impl TaskListInfoIterator { | |
9cb2c97c | 715 | /// Creates a new iterator instance. |
e8c124fe | 716 | pub fn new(active_only: bool) -> Result<Self, Error> { |
e8c124fe DM |
717 | let setup = worker_task_setup()?; |
718 | ||
719 | let (read_lock, active_list) = { | |
720 | let lock = setup.lock_task_list_files(false)?; | |
721 | let active_list = read_task_file_from_path(&setup.active_tasks_fn)?; | |
722 | ||
723 | let needs_update = active_list | |
724 | .iter() | |
725 | .any(|info| info.state.is_some() || !worker_is_active_local(&info.upid)); | |
726 | ||
727 | // TODO remove with 1.x | |
728 | let index_exists = setup.task_index_fn.is_file(); | |
729 | ||
730 | if needs_update || index_exists { | |
731 | drop(lock); | |
732 | setup.update_active_workers(None)?; | |
733 | let lock = setup.lock_task_list_files(false)?; | |
734 | let active_list = read_task_file_from_path(&setup.active_tasks_fn)?; | |
735 | (lock, active_list) | |
736 | } else { | |
737 | (lock, active_list) | |
738 | } | |
739 | }; | |
740 | ||
741 | let archive = if active_only { | |
742 | None | |
743 | } else { | |
ef69d1ae | 744 | let logrotate = LogRotate::new(&setup.task_archive_fn, true, None, None)?; |
e8c124fe DM |
745 | Some(logrotate.files()) |
746 | }; | |
747 | ||
748 | let lock = if active_only { None } else { Some(read_lock) }; | |
749 | ||
750 | Ok(Self { | |
751 | list: active_list.into(), | |
752 | end: active_only, | |
753 | archive, | |
754 | lock, | |
755 | }) | |
756 | } | |
757 | } | |
758 | ||
759 | impl Iterator for TaskListInfoIterator { | |
760 | type Item = Result<TaskListInfo, Error>; | |
761 | ||
762 | fn next(&mut self) -> Option<Self::Item> { | |
763 | loop { | |
764 | if let Some(element) = self.list.pop_back() { | |
765 | return Some(Ok(element)); | |
766 | } else if self.end { | |
2bda552b | 767 | return None; |
e8c124fe DM |
768 | } else { |
769 | if let Some(mut archive) = self.archive.take() { | |
770 | if let Some(file) = archive.next() { | |
771 | let list = match read_task_file(file) { | |
772 | Ok(list) => list, | |
773 | Err(err) => return Some(Err(err)), | |
774 | }; | |
775 | self.list.append(&mut list.into()); | |
776 | self.archive = Some(archive); | |
777 | continue; | |
778 | } | |
779 | } | |
780 | ||
781 | self.end = true; | |
782 | self.lock.take(); | |
783 | } | |
784 | } | |
785 | } | |
786 | } | |
787 | ||
788 | /// Launch long running worker tasks. | |
789 | /// | |
790 | /// A worker task can either be a whole thread, or a simply tokio | |
791 | /// task/future. Each task can `log()` messages, which are stored | |
792 | /// persistently to files. Task should poll the `abort_requested` | |
793 | /// flag, and stop execution when requested. | |
794 | pub struct WorkerTask { | |
795 | setup: &'static WorkerTaskSetup, | |
796 | upid: UPID, | |
797 | data: Mutex<WorkerTaskData>, | |
798 | abort_requested: AtomicBool, | |
799 | } | |
800 | ||
801 | impl std::fmt::Display for WorkerTask { | |
e8c124fe DM |
802 | fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { |
803 | self.upid.fmt(f) | |
804 | } | |
805 | } | |
806 | ||
807 | struct WorkerTaskData { | |
808 | logger: FileLogger, | |
809 | progress: f64, // 0..1 | |
810 | warn_count: u64, | |
811 | pub abort_listeners: Vec<oneshot::Sender<()>>, | |
812 | } | |
813 | ||
814 | impl WorkerTask { | |
e8c124fe DM |
815 | pub fn new( |
816 | worker_type: &str, | |
817 | worker_id: Option<String>, | |
818 | auth_id: String, | |
819 | to_stdout: bool, | |
820 | ) -> Result<Arc<Self>, Error> { | |
e8c124fe DM |
821 | let setup = worker_task_setup()?; |
822 | ||
823 | let upid = UPID::new(worker_type, worker_id, auth_id)?; | |
824 | let task_id = upid.task_id; | |
825 | ||
826 | let mut path = setup.taskdir.clone(); | |
827 | ||
828 | path.push(format!("{:02X}", upid.pstart & 255)); | |
829 | ||
2bda552b TL |
830 | let dir_opts = setup |
831 | .file_opts | |
832 | .clone() | |
e8c124fe DM |
833 | .perm(nix::sys::stat::Mode::from_bits_truncate(0o755)); |
834 | ||
835 | create_path(&path, None, Some(dir_opts))?; | |
836 | ||
837 | path.push(upid.to_string()); | |
838 | ||
839 | let logger_options = FileLogOptions { | |
840 | to_stdout, | |
841 | exclusive: true, | |
842 | prefix_time: true, | |
843 | read: true, | |
844 | file_opts: setup.file_opts.clone(), | |
845 | ..Default::default() | |
846 | }; | |
847 | let logger = FileLogger::new(&path, logger_options)?; | |
848 | ||
849 | let worker = Arc::new(Self { | |
850 | setup, | |
851 | upid: upid.clone(), | |
852 | abort_requested: AtomicBool::new(false), | |
853 | data: Mutex::new(WorkerTaskData { | |
854 | logger, | |
855 | progress: 0.0, | |
856 | warn_count: 0, | |
857 | abort_listeners: vec![], | |
858 | }), | |
859 | }); | |
860 | ||
861 | // scope to drop the lock again after inserting | |
862 | { | |
863 | let mut hash = WORKER_TASK_LIST.lock().unwrap(); | |
864 | hash.insert(task_id, worker.clone()); | |
865 | crate::set_worker_count(hash.len()); | |
866 | } | |
867 | ||
868 | setup.update_active_workers(Some(&upid))?; | |
869 | ||
870 | Ok(worker) | |
871 | } | |
872 | ||
873 | /// Spawn a new tokio task/future. | |
874 | pub fn spawn<F, T>( | |
875 | worker_type: &str, | |
876 | worker_id: Option<String>, | |
877 | auth_id: String, | |
878 | to_stdout: bool, | |
879 | f: F, | |
880 | ) -> Result<String, Error> | |
2bda552b TL |
881 | where |
882 | F: Send + 'static + FnOnce(Arc<WorkerTask>) -> T, | |
883 | T: Send + 'static + Future<Output = Result<(), Error>>, | |
e8c124fe DM |
884 | { |
885 | let worker = WorkerTask::new(worker_type, worker_id, auth_id, to_stdout)?; | |
886 | let upid_str = worker.upid.to_string(); | |
887 | let f = f(worker.clone()); | |
888 | tokio::spawn(async move { | |
889 | let result = f.await; | |
890 | worker.log_result(&result); | |
891 | }); | |
892 | ||
893 | Ok(upid_str) | |
894 | } | |
895 | ||
896 | /// Create a new worker thread. | |
897 | pub fn new_thread<F>( | |
898 | worker_type: &str, | |
899 | worker_id: Option<String>, | |
900 | auth_id: String, | |
901 | to_stdout: bool, | |
902 | f: F, | |
903 | ) -> Result<String, Error> | |
2bda552b TL |
904 | where |
905 | F: Send + UnwindSafe + 'static + FnOnce(Arc<WorkerTask>) -> Result<(), Error>, | |
e8c124fe DM |
906 | { |
907 | let worker = WorkerTask::new(worker_type, worker_id, auth_id, to_stdout)?; | |
908 | let upid_str = worker.upid.to_string(); | |
909 | ||
2bda552b TL |
910 | let _child = std::thread::Builder::new() |
911 | .name(upid_str.clone()) | |
912 | .spawn(move || { | |
913 | let worker1 = worker.clone(); | |
914 | let result = match std::panic::catch_unwind(move || f(worker1)) { | |
915 | Ok(r) => r, | |
916 | Err(panic) => match panic.downcast::<&str>() { | |
917 | Ok(panic_msg) => Err(format_err!("worker panicked: {}", panic_msg)), | |
918 | Err(_) => Err(format_err!("worker panicked: unknown type.")), | |
919 | }, | |
920 | }; | |
921 | ||
922 | worker.log_result(&result); | |
923 | }); | |
e8c124fe DM |
924 | |
925 | Ok(upid_str) | |
926 | } | |
927 | ||
928 | /// create state from self and a result | |
929 | pub fn create_state(&self, result: &Result<(), Error>) -> TaskState { | |
930 | let warn_count = self.data.lock().unwrap().warn_count; | |
931 | ||
09046671 | 932 | let endtime = proxmox_time::epoch_i64(); |
e8c124fe DM |
933 | |
934 | if let Err(err) = result { | |
2bda552b TL |
935 | TaskState::Error { |
936 | message: err.to_string(), | |
937 | endtime, | |
938 | } | |
e8c124fe | 939 | } else if warn_count > 0 { |
2bda552b TL |
940 | TaskState::Warning { |
941 | count: warn_count, | |
942 | endtime, | |
943 | } | |
e8c124fe DM |
944 | } else { |
945 | TaskState::OK { endtime } | |
946 | } | |
947 | } | |
948 | ||
949 | /// Log task result, remove task from running list | |
950 | pub fn log_result(&self, result: &Result<(), Error>) { | |
951 | let state = self.create_state(result); | |
020c8e69 | 952 | self.log_message(state.result_text()); |
e8c124fe DM |
953 | |
954 | WORKER_TASK_LIST.lock().unwrap().remove(&self.upid.task_id); | |
955 | let _ = self.setup.update_active_workers(None); | |
956 | crate::set_worker_count(WORKER_TASK_LIST.lock().unwrap().len()); | |
957 | } | |
958 | ||
959 | /// Log a message. | |
020c8e69 | 960 | pub fn log_message<S: AsRef<str>>(&self, msg: S) { |
e8c124fe DM |
961 | let mut data = self.data.lock().unwrap(); |
962 | data.logger.log(msg); | |
963 | } | |
964 | ||
965 | /// Log a message as warning. | |
020c8e69 | 966 | pub fn log_warning<S: AsRef<str>>(&self, msg: S) { |
e8c124fe DM |
967 | let mut data = self.data.lock().unwrap(); |
968 | data.logger.log(format!("WARN: {}", msg.as_ref())); | |
969 | data.warn_count += 1; | |
970 | } | |
971 | ||
972 | /// Set progress indicator | |
973 | pub fn progress(&self, progress: f64) { | |
974 | if progress >= 0.0 && progress <= 1.0 { | |
975 | let mut data = self.data.lock().unwrap(); | |
976 | data.progress = progress; | |
977 | } else { | |
2bda552b | 978 | // fixme: log!("task '{}': ignoring strange value for progress '{}'", self.upid, progress); |
e8c124fe DM |
979 | } |
980 | } | |
981 | ||
982 | /// Request abort | |
983 | pub fn request_abort(&self) { | |
e8c124fe | 984 | let prev_abort = self.abort_requested.swap(true, Ordering::SeqCst); |
2bda552b TL |
985 | if !prev_abort { |
986 | // log abort one time | |
bb7018e1 | 987 | self.log_message("received abort request ...".to_string()); |
e8c124fe DM |
988 | } |
989 | // noitify listeners | |
990 | let mut data = self.data.lock().unwrap(); | |
991 | loop { | |
992 | match data.abort_listeners.pop() { | |
2bda552b TL |
993 | None => { |
994 | break; | |
995 | } | |
e8c124fe DM |
996 | Some(ch) => { |
997 | let _ = ch.send(()); // ignore errors here | |
2bda552b | 998 | } |
e8c124fe DM |
999 | } |
1000 | } | |
1001 | } | |
1002 | ||
e8c124fe | 1003 | /// Get a future which resolves on task abort |
2bda552b | 1004 | pub fn abort_future(&self) -> oneshot::Receiver<()> { |
e8c124fe DM |
1005 | let (tx, rx) = oneshot::channel::<()>(); |
1006 | ||
1007 | let mut data = self.data.lock().unwrap(); | |
1008 | if self.abort_requested() { | |
1009 | let _ = tx.send(()); | |
1010 | } else { | |
1011 | data.abort_listeners.push(tx); | |
1012 | } | |
1013 | rx | |
1014 | } | |
1015 | ||
1016 | pub fn upid(&self) -> &UPID { | |
1017 | &self.upid | |
1018 | } | |
1019 | } | |
1020 | ||
7a4bb600 | 1021 | impl WorkerTaskContext for WorkerTask { |
42dae7e1 DM |
1022 | fn abort_requested(&self) -> bool { |
1023 | self.abort_requested.load(Ordering::SeqCst) | |
e8c124fe DM |
1024 | } |
1025 | ||
6e50c7aa DM |
1026 | fn shutdown_requested(&self) -> bool { |
1027 | crate::shutdown_requested() | |
1028 | } | |
1029 | ||
1030 | fn fail_on_shutdown(&self) -> Result<(), Error> { | |
1031 | crate::fail_on_shutdown() | |
1032 | } | |
1033 | ||
e8c124fe DM |
1034 | fn log(&self, level: log::Level, message: &std::fmt::Arguments) { |
1035 | match level { | |
020c8e69 DM |
1036 | log::Level::Error => self.log_warning(&message.to_string()), |
1037 | log::Level::Warn => self.log_warning(&message.to_string()), | |
1038 | log::Level::Info => self.log_message(&message.to_string()), | |
1039 | log::Level::Debug => self.log_message(&format!("DEBUG: {}", message)), | |
1040 | log::Level::Trace => self.log_message(&format!("TRACE: {}", message)), | |
e8c124fe DM |
1041 | } |
1042 | } | |
1043 | } | |
1044 | ||
1045 | /// Wait for a locally spanned worker task | |
1046 | /// | |
1047 | /// Note: local workers should print logs to stdout, so there is no | |
1048 | /// need to fetch/display logs. We just wait for the worker to finish. | |
1049 | pub async fn wait_for_local_worker(upid_str: &str) -> Result<(), Error> { | |
e8c124fe DM |
1050 | let upid: UPID = upid_str.parse()?; |
1051 | ||
1052 | let sleep_duration = core::time::Duration::new(0, 100_000_000); | |
1053 | ||
1054 | loop { | |
1055 | if worker_is_active_local(&upid) { | |
1056 | tokio::time::sleep(sleep_duration).await; | |
1057 | } else { | |
1058 | break; | |
1059 | } | |
1060 | } | |
1061 | Ok(()) | |
1062 | } | |
1063 | ||
1064 | /// Request abort of a local worker (if existing and running) | |
1065 | pub fn abort_local_worker(upid: UPID) { | |
647a0db8 | 1066 | if let Some(worker) = WORKER_TASK_LIST.lock().unwrap().get(&upid.task_id) { |
e8c124fe DM |
1067 | worker.request_abort(); |
1068 | } | |
1069 | } |