]>
Commit | Line | Data |
---|---|---|
e7244387 | 1 | use std::collections::{HashMap, VecDeque}; |
4b01c983 | 2 | use std::fs::File; |
0a33fba4 | 3 | use std::path::PathBuf; |
5ade6c25 | 4 | use std::io::{Read, Write, BufRead, BufReader}; |
d3f4c08f | 5 | use std::panic::UnwindSafe; |
18c0df4c WB |
6 | use std::sync::atomic::{AtomicBool, Ordering}; |
7 | use std::sync::{Arc, Mutex}; | |
d3f4c08f | 8 | |
f7d4e4b5 | 9 | use anyhow::{bail, format_err, Error}; |
18c0df4c WB |
10 | use futures::*; |
11 | use lazy_static::lazy_static; | |
321070b4 | 12 | use serde_json::{json, Value}; |
4c116baf | 13 | use serde::{Serialize, Deserialize}; |
18c0df4c | 14 | use tokio::sync::oneshot; |
0a33fba4 DM |
15 | use nix::fcntl::OFlag; |
16 | use once_cell::sync::OnceCell; | |
479f6e40 | 17 | |
619495b2 | 18 | use proxmox::sys::linux::procfs; |
9ea4bce4 | 19 | use proxmox::try_block; |
0a33fba4 | 20 | use proxmox::tools::fs::{create_path, replace_file, atomic_open_or_create_file, CreateOptions}; |
81867f05 | 21 | use proxmox::api::upid::UPID; |
e18a6c9e | 22 | |
c8449217 | 23 | use pbs_tools::task::WorkerTaskContext; |
6c76aa43 | 24 | use pbs_tools::logrotate::{LogRotate, LogRotateFiles}; |
b9700a9f DM |
25 | |
26 | use crate::{CommandoSocket, FileLogger, FileLogOptions}; | |
6c76aa43 | 27 | |
0a33fba4 | 28 | struct TaskListLockGuard(File); |
af06decd | 29 | |
0a33fba4 DM |
30 | struct WorkerTaskSetup { |
31 | file_opts: CreateOptions, | |
32 | taskdir: PathBuf, | |
33 | task_lock_fn: PathBuf, | |
34 | active_tasks_fn: PathBuf, | |
35 | task_index_fn: PathBuf, | |
36 | task_archive_fn: PathBuf, | |
37 | } | |
38 | ||
39 | static WORKER_TASK_SETUP: OnceCell<WorkerTaskSetup> = OnceCell::new(); | |
40 | ||
41 | fn worker_task_setup() -> Result<&'static WorkerTaskSetup, Error> { | |
42 | WORKER_TASK_SETUP.get() | |
43 | .ok_or_else(|| format_err!("WorkerTask library is not initialized")) | |
44 | } | |
45 | ||
46 | impl WorkerTaskSetup { | |
47 | ||
48 | fn new(basedir: PathBuf, file_opts: CreateOptions) -> Self { | |
49 | ||
50 | let mut taskdir = basedir.clone(); | |
51 | taskdir.push("tasks"); | |
52 | ||
53 | let mut task_lock_fn = taskdir.clone(); | |
54 | task_lock_fn.push(".active.lock"); | |
55 | ||
56 | let mut active_tasks_fn = taskdir.clone(); | |
57 | active_tasks_fn.push("active"); | |
58 | ||
59 | let mut task_index_fn = taskdir.clone(); | |
60 | task_index_fn.push("index"); | |
61 | ||
62 | let mut task_archive_fn = taskdir.clone(); | |
63 | task_archive_fn.push("archive"); | |
64 | ||
65 | Self { | |
66 | file_opts, | |
67 | taskdir, | |
68 | task_lock_fn, | |
69 | active_tasks_fn, | |
70 | task_index_fn, | |
71 | task_archive_fn, | |
72 | } | |
73 | } | |
74 | ||
75 | fn lock_task_list_files(&self, exclusive: bool) -> Result<TaskListLockGuard, Error> { | |
76 | let options = self.file_opts.clone() | |
77 | .perm(nix::sys::stat::Mode::from_bits_truncate(0o660)); | |
78 | ||
79 | let timeout = std::time::Duration::new(10, 0); | |
80 | ||
81 | let file = proxmox::tools::fs::open_file_locked( | |
82 | &self.task_lock_fn, | |
83 | timeout, | |
84 | exclusive, | |
85 | options, | |
86 | )?; | |
87 | ||
88 | Ok(TaskListLockGuard(file)) | |
89 | } | |
90 | ||
91 | fn log_path(&self, upid: &UPID) -> std::path::PathBuf { | |
92 | let mut path = self.taskdir.clone(); | |
93 | path.push(format!("{:02X}", upid.pstart % 256)); | |
94 | path.push(upid.to_string()); | |
95 | path | |
96 | } | |
97 | ||
98 | // atomically read/update the task list, update status of finished tasks | |
99 | // new_upid is added to the list when specified. | |
100 | fn update_active_workers(&self, new_upid: Option<&UPID>) -> Result<(), Error> { | |
101 | ||
102 | let lock = self.lock_task_list_files(true)?; | |
103 | ||
104 | // TODO remove with 1.x | |
105 | let mut finish_list: Vec<TaskListInfo> = read_task_file_from_path(&self.task_index_fn)?; | |
106 | let had_index_file = !finish_list.is_empty(); | |
107 | ||
108 | // We use filter_map because one negative case wants to *move* the data into `finish_list`, | |
109 | // clippy doesn't quite catch this! | |
110 | #[allow(clippy::unnecessary_filter_map)] | |
111 | let mut active_list: Vec<TaskListInfo> = read_task_file_from_path(&self.active_tasks_fn)? | |
112 | .into_iter() | |
113 | .filter_map(|info| { | |
114 | if info.state.is_some() { | |
115 | // this can happen when the active file still includes finished tasks | |
116 | finish_list.push(info); | |
117 | return None; | |
118 | } | |
119 | ||
120 | if !worker_is_active_local(&info.upid) { | |
121 | // println!("Detected stopped task '{}'", &info.upid_str); | |
122 | let now = proxmox::tools::time::epoch_i64(); | |
123 | let status = upid_read_status(&info.upid).unwrap_or(TaskState::Unknown { endtime: now }); | |
124 | finish_list.push(TaskListInfo { | |
125 | upid: info.upid, | |
126 | upid_str: info.upid_str, | |
127 | state: Some(status) | |
128 | }); | |
129 | return None; | |
130 | } | |
131 | ||
132 | Some(info) | |
133 | }).collect(); | |
134 | ||
135 | if let Some(upid) = new_upid { | |
136 | active_list.push(TaskListInfo { upid: upid.clone(), upid_str: upid.to_string(), state: None }); | |
137 | } | |
138 | ||
139 | let active_raw = render_task_list(&active_list); | |
140 | ||
141 | let options = self.file_opts.clone() | |
142 | .perm(nix::sys::stat::Mode::from_bits_truncate(0o660)); | |
143 | ||
144 | replace_file( | |
145 | &self.active_tasks_fn, | |
146 | active_raw.as_bytes(), | |
147 | options, | |
148 | )?; | |
149 | ||
150 | finish_list.sort_unstable_by(|a, b| { | |
151 | match (&a.state, &b.state) { | |
152 | (Some(s1), Some(s2)) => s1.cmp(&s2), | |
153 | (Some(_), None) => std::cmp::Ordering::Less, | |
154 | (None, Some(_)) => std::cmp::Ordering::Greater, | |
155 | _ => a.upid.starttime.cmp(&b.upid.starttime), | |
156 | } | |
157 | }); | |
158 | ||
159 | if !finish_list.is_empty() { | |
160 | let options = self.file_opts.clone() | |
161 | .perm(nix::sys::stat::Mode::from_bits_truncate(0o660)); | |
162 | ||
163 | let mut writer = atomic_open_or_create_file( | |
164 | &self.task_archive_fn, | |
165 | OFlag::O_APPEND | OFlag::O_RDWR, | |
166 | &[], | |
167 | options, | |
168 | )?; | |
169 | for info in &finish_list { | |
170 | writer.write_all(render_task_line(&info).as_bytes())?; | |
171 | } | |
172 | } | |
173 | ||
174 | // TODO Remove with 1.x | |
175 | // for compatibility, if we had an INDEX file, we do not need it anymore | |
176 | if had_index_file { | |
177 | let _ = nix::unistd::unlink(&self.task_index_fn); | |
178 | } | |
179 | ||
180 | drop(lock); | |
181 | ||
182 | Ok(()) | |
183 | } | |
184 | ||
185 | // Create task log directory with correct permissions | |
186 | fn create_task_log_dirs(&self) -> Result<(), Error> { | |
187 | ||
188 | try_block!({ | |
189 | let dir_opts = self.file_opts.clone() | |
190 | .perm(nix::sys::stat::Mode::from_bits_truncate(0o755)); | |
191 | ||
192 | create_path(&self.taskdir, Some(dir_opts.clone()), Some(dir_opts.clone()))?; | |
193 | // fixme:??? create_path(pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR, None, Some(opts))?; | |
194 | Ok(()) | |
195 | }).map_err(|err: Error| format_err!("unable to create task log dir - {}", err)) | |
196 | } | |
197 | } | |
198 | ||
199 | /// Initialize the WorkerTask library | |
200 | pub fn init_worker_tasks(basedir: PathBuf, file_opts: CreateOptions) -> Result<(), Error> { | |
201 | let setup = WorkerTaskSetup::new(basedir, file_opts); | |
202 | setup.create_task_log_dirs()?; | |
203 | WORKER_TASK_SETUP.set(setup) | |
204 | .map_err(|_| format_err!("init_worker_tasks failed - already initialized")) | |
205 | } | |
206 | ||
207 | /// checks if the Task Archive is bigger that 'size_threshold' bytes, and | |
208 | /// rotates it if it is | |
209 | pub fn rotate_task_log_archive(size_threshold: u64, compress: bool, max_files: Option<usize>) -> Result<bool, Error> { | |
210 | ||
211 | let setup = worker_task_setup()?; | |
212 | ||
213 | let _lock = setup.lock_task_list_files(true)?; | |
214 | ||
215 | let mut logrotate = LogRotate::new(&setup.task_archive_fn, compress) | |
216 | .ok_or_else(|| format_err!("could not get archive file names"))?; | |
217 | ||
218 | logrotate.rotate(size_threshold, None, max_files) | |
219 | } | |
220 | ||
221 | ||
222 | /// Path to the worker log file | |
223 | pub fn upid_log_path(upid: &UPID) -> Result<std::path::PathBuf, Error> { | |
224 | let setup = worker_task_setup()?; | |
225 | Ok(setup.log_path(upid)) | |
226 | } | |
227 | ||
228 | /// Read endtime (time of last log line) and exitstatus from task log file | |
229 | /// If there is not a single line with at valid datetime, we assume the | |
230 | /// starttime to be the endtime | |
231 | pub fn upid_read_status(upid: &UPID) -> Result<TaskState, Error> { | |
232 | ||
233 | let setup = worker_task_setup()?; | |
234 | ||
235 | let mut status = TaskState::Unknown { endtime: upid.starttime }; | |
236 | ||
237 | let path = setup.log_path(upid); | |
238 | ||
239 | let mut file = File::open(path)?; | |
240 | ||
241 | /// speedup - only read tail | |
242 | use std::io::Seek; | |
243 | use std::io::SeekFrom; | |
244 | let _ = file.seek(SeekFrom::End(-8192)); // ignore errors | |
245 | ||
246 | let mut data = Vec::with_capacity(8192); | |
247 | file.read_to_end(&mut data)?; | |
248 | ||
249 | // strip newlines at the end of the task logs | |
250 | while data.last() == Some(&b'\n') { | |
251 | data.pop(); | |
252 | } | |
253 | ||
254 | let last_line = match data.iter().rposition(|c| *c == b'\n') { | |
255 | Some(start) if data.len() > (start+1) => &data[start+1..], | |
256 | Some(_) => &data, // should not happen, since we removed all trailing newlines | |
257 | None => &data, | |
258 | }; | |
259 | ||
260 | let last_line = std::str::from_utf8(last_line) | |
261 | .map_err(|err| format_err!("upid_read_status: utf8 parse failed: {}", err))?; | |
262 | ||
263 | let mut iter = last_line.splitn(2, ": "); | |
264 | if let Some(time_str) = iter.next() { | |
265 | if let Ok(endtime) = proxmox::tools::time::parse_rfc3339(time_str) { | |
266 | // set the endtime even if we cannot parse the state | |
267 | status = TaskState::Unknown { endtime }; | |
268 | if let Some(rest) = iter.next().and_then(|rest| rest.strip_prefix("TASK ")) { | |
269 | if let Ok(state) = TaskState::from_endtime_and_message(endtime, rest) { | |
270 | status = state; | |
271 | } | |
272 | } | |
273 | } | |
274 | } | |
275 | ||
276 | Ok(status) | |
346a488e | 277 | } |
784fa1c2 | 278 | |
479f6e40 DM |
279 | lazy_static! { |
280 | static ref WORKER_TASK_LIST: Mutex<HashMap<usize, Arc<WorkerTask>>> = Mutex::new(HashMap::new()); | |
a68768cf | 281 | } |
d607b886 | 282 | |
a68768cf TL |
283 | /// checks if the task UPID refers to a worker from this process |
284 | fn is_local_worker(upid: &UPID) -> bool { | |
b9700a9f | 285 | upid.pid == crate::pid() && upid.pstart == crate::pstart() |
479f6e40 DM |
286 | } |
287 | ||
634132fe | 288 | /// Test if the task is still running |
5751e495 | 289 | pub async fn worker_is_active(upid: &UPID) -> Result<bool, Error> { |
a68768cf | 290 | if is_local_worker(upid) { |
5751e495 DM |
291 | return Ok(WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id)); |
292 | } | |
293 | ||
3984a5fd | 294 | if procfs::check_process_running_pstart(upid.pid, upid.pstart).is_none() { |
5751e495 DM |
295 | return Ok(false); |
296 | } | |
297 | ||
b9700a9f | 298 | let sock = crate::ctrl_sock_from_pid(upid.pid); |
5751e495 | 299 | let cmd = json!({ |
a68768cf | 300 | "command": "worker-task-status", |
385681c9 TL |
301 | "args": { |
302 | "upid": upid.to_string(), | |
303 | }, | |
5751e495 | 304 | }); |
b9700a9f | 305 | let status = crate::send_command(sock, &cmd).await?; |
4494d078 | 306 | |
5751e495 DM |
307 | if let Some(active) = status.as_bool() { |
308 | Ok(active) | |
309 | } else { | |
310 | bail!("got unexpected result {:?} (expected bool)", status); | |
311 | } | |
312 | } | |
313 | ||
314 | /// Test if the task is still running (fast but inaccurate implementation) | |
315 | /// | |
a68768cf | 316 | /// If the task is spawned from a different process, we simply return if |
5751e495 DM |
317 | /// that process is still running. This information is good enough to detect |
318 | /// stale tasks... | |
77ebbefc | 319 | pub fn worker_is_active_local(upid: &UPID) -> bool { |
a68768cf | 320 | if is_local_worker(upid) { |
62ee2eb4 | 321 | WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id) |
634132fe | 322 | } else { |
62ee2eb4 | 323 | procfs::check_process_running_pstart(upid.pid, upid.pstart).is_some() |
479f6e40 DM |
324 | } |
325 | } | |
326 | ||
2e44983a DM |
327 | /// Register task control command on a [CommandoSocket]. |
328 | /// | |
329 | /// This create two commands: | |
330 | /// | |
331 | /// * ``worker-task-abort <UPID>``: calls [abort_local_worker] | |
332 | /// | |
333 | /// * ``worker-task-status <UPID>``: return true of false, depending on | |
334 | /// whether the worker is running or stopped. | |
a68768cf | 335 | pub fn register_task_control_commands( |
fd6d2438 | 336 | commando_sock: &mut CommandoSocket, |
a68768cf TL |
337 | ) -> Result<(), Error> { |
338 | fn get_upid(args: Option<&Value>) -> Result<UPID, Error> { | |
339 | let args = if let Some(args) = args { args } else { bail!("missing args") }; | |
340 | let upid = match args.get("upid") { | |
341 | Some(Value::String(upid)) => upid.parse::<UPID>()?, | |
342 | None => bail!("no upid in args"), | |
343 | _ => bail!("unable to parse upid"), | |
344 | }; | |
345 | if !is_local_worker(&upid) { | |
d607b886 DM |
346 | bail!("upid does not belong to this process"); |
347 | } | |
a68768cf TL |
348 | Ok(upid) |
349 | } | |
d607b886 | 350 | |
a68768cf TL |
351 | commando_sock.register_command("worker-task-abort".into(), move |args| { |
352 | let upid = get_upid(args)?; | |
353 | ||
3f742f95 DC |
354 | abort_local_worker(upid); |
355 | ||
a68768cf | 356 | Ok(Value::Null) |
d607b886 | 357 | })?; |
a68768cf TL |
358 | commando_sock.register_command("worker-task-status".into(), move |args| { |
359 | let upid = get_upid(args)?; | |
d607b886 | 360 | |
a68768cf TL |
361 | let active = WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id); |
362 | ||
363 | Ok(active.into()) | |
364 | })?; | |
d607b886 DM |
365 | |
366 | Ok(()) | |
367 | } | |
368 | ||
2e44983a DM |
369 | /// Try to abort a worker task, but do no wait |
370 | /// | |
371 | /// Errors (if any) are simply logged. | |
372 | pub fn abort_worker_nowait(upid: UPID) { | |
75fef4b4 WB |
373 | tokio::spawn(async move { |
374 | if let Err(err) = abort_worker(upid).await { | |
2e44983a | 375 | log::error!("abort worker task failed - {}", err); |
321070b4 | 376 | } |
75fef4b4 | 377 | }); |
321070b4 DM |
378 | } |
379 | ||
2e44983a DM |
380 | /// Abort a worker task |
381 | /// | |
382 | /// By sending ``worker-task-abort`` to the control socket. | |
5751e495 | 383 | pub async fn abort_worker(upid: UPID) -> Result<(), Error> { |
321070b4 | 384 | |
b9700a9f | 385 | let sock = crate::ctrl_sock_from_pid(upid.pid); |
321070b4 | 386 | let cmd = json!({ |
a68768cf | 387 | "command": "worker-task-abort", |
385681c9 TL |
388 | "args": { |
389 | "upid": upid.to_string(), | |
390 | }, | |
321070b4 | 391 | }); |
b9700a9f | 392 | crate::send_command(sock, &cmd).map_ok(|_| ()).await |
321070b4 DM |
393 | } |
394 | ||
77bd2a46 | 395 | fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<TaskState>), Error> { |
4b01c983 DM |
396 | |
397 | let data = line.splitn(3, ' ').collect::<Vec<&str>>(); | |
398 | ||
399 | let len = data.len(); | |
400 | ||
401 | match len { | |
402 | 1 => Ok((data[0].to_owned(), data[0].parse::<UPID>()?, None)), | |
403 | 3 => { | |
404 | let endtime = i64::from_str_radix(data[1], 16)?; | |
77bd2a46 DC |
405 | let state = TaskState::from_endtime_and_message(endtime, data[2])?; |
406 | Ok((data[0].to_owned(), data[0].parse::<UPID>()?, Some(state))) | |
4b01c983 DM |
407 | } |
408 | _ => bail!("wrong number of components"), | |
409 | } | |
410 | } | |
411 | ||
4c116baf | 412 | /// Task State |
77bd2a46 | 413 | #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] |
4c116baf DC |
414 | pub enum TaskState { |
415 | /// The Task ended with an undefined state | |
77bd2a46 | 416 | Unknown { endtime: i64 }, |
4c116baf | 417 | /// The Task ended and there were no errors or warnings |
77bd2a46 | 418 | OK { endtime: i64 }, |
4c116baf | 419 | /// The Task had 'count' amount of warnings and no errors |
77bd2a46 | 420 | Warning { count: u64, endtime: i64 }, |
4c116baf | 421 | /// The Task ended with the error described in 'message' |
77bd2a46 | 422 | Error { message: String, endtime: i64 }, |
4c116baf DC |
423 | } |
424 | ||
425 | impl TaskState { | |
77bd2a46 DC |
426 | pub fn endtime(&self) -> i64 { |
427 | match *self { | |
428 | TaskState::Unknown { endtime } => endtime, | |
429 | TaskState::OK { endtime } => endtime, | |
430 | TaskState::Warning { endtime, .. } => endtime, | |
431 | TaskState::Error { endtime, .. } => endtime, | |
4c116baf DC |
432 | } |
433 | } | |
4c116baf | 434 | |
77bd2a46 | 435 | fn result_text(&self) -> String { |
4c116baf | 436 | match self { |
77bd2a46 DC |
437 | TaskState::Error { message, .. } => format!("TASK ERROR: {}", message), |
438 | other => format!("TASK {}", other), | |
4c116baf DC |
439 | } |
440 | } | |
4c116baf | 441 | |
77bd2a46 | 442 | fn from_endtime_and_message(endtime: i64, s: &str) -> Result<Self, Error> { |
4c116baf | 443 | if s == "unknown" { |
77bd2a46 | 444 | Ok(TaskState::Unknown { endtime }) |
4c116baf | 445 | } else if s == "OK" { |
77bd2a46 | 446 | Ok(TaskState::OK { endtime }) |
365915da FG |
447 | } else if let Some(warnings) = s.strip_prefix("WARNINGS: ") { |
448 | let count: u64 = warnings.parse()?; | |
77bd2a46 | 449 | Ok(TaskState::Warning{ count, endtime }) |
3984a5fd | 450 | } else if !s.is_empty() { |
365915da | 451 | let message = if let Some(err) = s.strip_prefix("ERROR: ") { err } else { s }.to_string(); |
77bd2a46 | 452 | Ok(TaskState::Error{ message, endtime }) |
4c116baf DC |
453 | } else { |
454 | bail!("unable to parse Task Status '{}'", s); | |
455 | } | |
456 | } | |
457 | } | |
458 | ||
77bd2a46 DC |
459 | impl std::cmp::PartialOrd for TaskState { |
460 | fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { | |
461 | Some(self.endtime().cmp(&other.endtime())) | |
462 | } | |
463 | } | |
464 | ||
465 | impl std::cmp::Ord for TaskState { | |
466 | fn cmp(&self, other: &Self) -> std::cmp::Ordering { | |
467 | self.endtime().cmp(&other.endtime()) | |
468 | } | |
469 | } | |
470 | ||
471 | impl std::fmt::Display for TaskState { | |
472 | fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | |
473 | match self { | |
474 | TaskState::Unknown { .. } => write!(f, "unknown"), | |
475 | TaskState::OK { .. }=> write!(f, "OK"), | |
476 | TaskState::Warning { count, .. } => write!(f, "WARNINGS: {}", count), | |
477 | TaskState::Error { message, .. } => write!(f, "{}", message), | |
478 | } | |
479 | } | |
480 | } | |
481 | ||
93aebb38 DM |
482 | /// Task details including parsed UPID |
483 | /// | |
484 | /// If there is no `state`, the task is still running. | |
485 | #[derive(Debug)] | |
486 | pub struct TaskListInfo { | |
487 | /// The parsed UPID | |
488 | pub upid: UPID, | |
489 | /// UPID string representation | |
490 | pub upid_str: String, | |
491 | /// Task `(endtime, status)` if already finished | |
77bd2a46 | 492 | pub state: Option<TaskState>, // endtime, status |
93aebb38 DM |
493 | } |
494 | ||
bbeb0256 DC |
495 | fn render_task_line(info: &TaskListInfo) -> String { |
496 | let mut raw = String::new(); | |
497 | if let Some(status) = &info.state { | |
498 | raw.push_str(&format!("{} {:08X} {}\n", info.upid_str, status.endtime(), status)); | |
499 | } else { | |
500 | raw.push_str(&info.upid_str); | |
501 | raw.push('\n'); | |
502 | } | |
503 | ||
504 | raw | |
505 | } | |
506 | ||
507 | fn render_task_list(list: &[TaskListInfo]) -> String { | |
508 | let mut raw = String::new(); | |
509 | for info in list { | |
510 | raw.push_str(&render_task_line(&info)); | |
511 | } | |
512 | raw | |
513 | } | |
514 | ||
784fa1c2 DC |
515 | // note this is not locked, caller has to make sure it is |
516 | // this will skip (and log) lines that are not valid status lines | |
517 | fn read_task_file<R: Read>(reader: R) -> Result<Vec<TaskListInfo>, Error> | |
518 | { | |
519 | let reader = BufReader::new(reader); | |
520 | let mut list = Vec::new(); | |
521 | for line in reader.lines() { | |
522 | let line = line?; | |
523 | match parse_worker_status_line(&line) { | |
524 | Ok((upid_str, upid, state)) => list.push(TaskListInfo { | |
525 | upid_str, | |
526 | upid, | |
527 | state | |
528 | }), | |
529 | Err(err) => { | |
2e44983a | 530 | log::warn!("unable to parse worker status '{}' - {}", line, err); |
784fa1c2 DC |
531 | continue; |
532 | } | |
533 | }; | |
534 | } | |
535 | ||
536 | Ok(list) | |
537 | } | |
538 | ||
539 | // note this is not locked, caller has to make sure it is | |
540 | fn read_task_file_from_path<P>(path: P) -> Result<Vec<TaskListInfo>, Error> | |
541 | where | |
542 | P: AsRef<std::path::Path> + std::fmt::Debug, | |
543 | { | |
544 | let file = match File::open(&path) { | |
545 | Ok(f) => f, | |
546 | Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()), | |
547 | Err(err) => bail!("unable to open task list {:?} - {}", path, err), | |
548 | }; | |
549 | ||
550 | read_task_file(file) | |
551 | } | |
552 | ||
2e44983a | 553 | /// Iterate over existing/active worker tasks |
e7244387 DC |
554 | pub struct TaskListInfoIterator { |
555 | list: VecDeque<TaskListInfo>, | |
264779e7 | 556 | end: bool, |
e7244387 | 557 | archive: Option<LogRotateFiles>, |
0a33fba4 | 558 | lock: Option<TaskListLockGuard>, |
e7244387 DC |
559 | } |
560 | ||
561 | impl TaskListInfoIterator { | |
2e44983a | 562 | /// Creates a new iterator instance. |
e7244387 | 563 | pub fn new(active_only: bool) -> Result<Self, Error> { |
0a33fba4 DM |
564 | |
565 | let setup = worker_task_setup()?; | |
566 | ||
e7244387 | 567 | let (read_lock, active_list) = { |
0a33fba4 DM |
568 | let lock = setup.lock_task_list_files(false)?; |
569 | let active_list = read_task_file_from_path(&setup.active_tasks_fn)?; | |
e7244387 DC |
570 | |
571 | let needs_update = active_list | |
572 | .iter() | |
df4827f2 | 573 | .any(|info| info.state.is_some() || !worker_is_active_local(&info.upid)); |
e7244387 | 574 | |
264779e7 | 575 | // TODO remove with 1.x |
0a33fba4 | 576 | let index_exists = setup.task_index_fn.is_file(); |
264779e7 DC |
577 | |
578 | if needs_update || index_exists { | |
e7244387 | 579 | drop(lock); |
0a33fba4 DM |
580 | setup.update_active_workers(None)?; |
581 | let lock = setup.lock_task_list_files(false)?; | |
582 | let active_list = read_task_file_from_path(&setup.active_tasks_fn)?; | |
e7244387 DC |
583 | (lock, active_list) |
584 | } else { | |
585 | (lock, active_list) | |
586 | } | |
587 | }; | |
588 | ||
589 | let archive = if active_only { | |
590 | None | |
591 | } else { | |
0a33fba4 | 592 | let logrotate = LogRotate::new(&setup.task_archive_fn, true) |
e4f5f59e | 593 | .ok_or_else(|| format_err!("could not get archive file names"))?; |
e7244387 DC |
594 | Some(logrotate.files()) |
595 | }; | |
596 | ||
e7244387 DC |
597 | let lock = if active_only { None } else { Some(read_lock) }; |
598 | ||
599 | Ok(Self { | |
600 | list: active_list.into(), | |
264779e7 | 601 | end: active_only, |
e7244387 DC |
602 | archive, |
603 | lock, | |
604 | }) | |
605 | } | |
606 | } | |
607 | ||
608 | impl Iterator for TaskListInfoIterator { | |
609 | type Item = Result<TaskListInfo, Error>; | |
610 | ||
611 | fn next(&mut self) -> Option<Self::Item> { | |
612 | loop { | |
613 | if let Some(element) = self.list.pop_back() { | |
614 | return Some(Ok(element)); | |
264779e7 DC |
615 | } else if self.end { |
616 | return None; | |
e7244387 | 617 | } else { |
264779e7 DC |
618 | if let Some(mut archive) = self.archive.take() { |
619 | if let Some(file) = archive.next() { | |
620 | let list = match read_task_file(file) { | |
621 | Ok(list) => list, | |
e7244387 DC |
622 | Err(err) => return Some(Err(err)), |
623 | }; | |
264779e7 DC |
624 | self.list.append(&mut list.into()); |
625 | self.archive = Some(archive); | |
626 | continue; | |
e7244387 | 627 | } |
e7244387 | 628 | } |
264779e7 DC |
629 | |
630 | self.end = true; | |
631 | self.lock.take(); | |
e7244387 DC |
632 | } |
633 | } | |
634 | } | |
635 | } | |
636 | ||
882594c5 DM |
637 | /// Launch long running worker tasks. |
638 | /// | |
639 | /// A worker task can either be a whole thread, or a simply tokio | |
640 | /// task/future. Each task can `log()` messages, which are stored | |
641 | /// persistently to files. Task should poll the `abort_requested` | |
642 | /// flag, and stop execution when requested. | |
479f6e40 | 643 | pub struct WorkerTask { |
0a33fba4 | 644 | setup: &'static WorkerTaskSetup, |
479f6e40 DM |
645 | upid: UPID, |
646 | data: Mutex<WorkerTaskData>, | |
647 | abort_requested: AtomicBool, | |
648 | } | |
649 | ||
650 | impl std::fmt::Display for WorkerTask { | |
651 | ||
652 | fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { | |
653 | self.upid.fmt(f) | |
654 | } | |
655 | } | |
656 | ||
479f6e40 DM |
657 | struct WorkerTaskData { |
658 | logger: FileLogger, | |
659 | progress: f64, // 0..1 | |
f6de2c73 | 660 | warn_count: u64, |
75bc49be | 661 | pub abort_listeners: Vec<oneshot::Sender<()>>, |
479f6e40 DM |
662 | } |
663 | ||
479f6e40 DM |
664 | impl WorkerTask { |
665 | ||
0a33fba4 DM |
666 | pub fn new( |
667 | worker_type: &str, | |
668 | worker_id: Option<String>, | |
669 | auth_id: String, | |
670 | to_stdout: bool, | |
671 | ) -> Result<Arc<Self>, Error> { | |
672 | ||
673 | let setup = worker_task_setup()?; | |
674 | ||
e6dc35ac | 675 | let upid = UPID::new(worker_type, worker_id, auth_id)?; |
634132fe | 676 | let task_id = upid.task_id; |
479f6e40 | 677 | |
0a33fba4 | 678 | let mut path = setup.taskdir.clone(); |
35950380 | 679 | |
7f3d9100 | 680 | path.push(format!("{:02X}", upid.pstart & 255)); |
479f6e40 | 681 | |
0a33fba4 DM |
682 | let dir_opts = setup.file_opts.clone() |
683 | .perm(nix::sys::stat::Mode::from_bits_truncate(0o755)); | |
35950380 | 684 | |
0a33fba4 | 685 | create_path(&path, None, Some(dir_opts))?; |
479f6e40 DM |
686 | |
687 | path.push(upid.to_string()); | |
688 | ||
c0df91f8 | 689 | let logger_options = FileLogOptions { |
913dddea | 690 | to_stdout, |
c0df91f8 | 691 | exclusive: true, |
e5adbc34 | 692 | prefix_time: true, |
c0df91f8 | 693 | read: true, |
0a33fba4 | 694 | file_opts: setup.file_opts.clone(), |
c0df91f8 TL |
695 | ..Default::default() |
696 | }; | |
697 | let logger = FileLogger::new(&path, logger_options)?; | |
479f6e40 DM |
698 | |
699 | let worker = Arc::new(Self { | |
0a33fba4 | 700 | setup, |
05d755b2 | 701 | upid: upid.clone(), |
479f6e40 DM |
702 | abort_requested: AtomicBool::new(false), |
703 | data: Mutex::new(WorkerTaskData { | |
704 | logger, | |
705 | progress: 0.0, | |
f6de2c73 | 706 | warn_count: 0, |
75bc49be | 707 | abort_listeners: vec![], |
479f6e40 DM |
708 | }), |
709 | }); | |
710 | ||
05d755b2 DC |
711 | // scope to drop the lock again after inserting |
712 | { | |
713 | let mut hash = WORKER_TASK_LIST.lock().unwrap(); | |
714 | hash.insert(task_id, worker.clone()); | |
b9700a9f | 715 | crate::set_worker_count(hash.len()); |
05d755b2 | 716 | } |
7a630df7 | 717 | |
0a33fba4 | 718 | setup.update_active_workers(Some(&upid))?; |
479f6e40 DM |
719 | |
720 | Ok(worker) | |
721 | } | |
722 | ||
882594c5 | 723 | /// Spawn a new tokio task/future. |
660c6846 DM |
724 | pub fn spawn<F, T>( |
725 | worker_type: &str, | |
726 | worker_id: Option<String>, | |
049a22a3 | 727 | auth_id: String, |
660c6846 DM |
728 | to_stdout: bool, |
729 | f: F, | |
730 | ) -> Result<String, Error> | |
479f6e40 | 731 | where F: Send + 'static + FnOnce(Arc<WorkerTask>) -> T, |
75fef4b4 | 732 | T: Send + 'static + Future<Output = Result<(), Error>>, |
479f6e40 | 733 | { |
e6dc35ac | 734 | let worker = WorkerTask::new(worker_type, worker_id, auth_id, to_stdout)?; |
660c6846 | 735 | let upid_str = worker.upid.to_string(); |
75fef4b4 WB |
736 | let f = f(worker.clone()); |
737 | tokio::spawn(async move { | |
738 | let result = f.await; | |
dd8e744f | 739 | worker.log_result(&result); |
75fef4b4 | 740 | }); |
479f6e40 | 741 | |
660c6846 | 742 | Ok(upid_str) |
479f6e40 DM |
743 | } |
744 | ||
882594c5 | 745 | /// Create a new worker thread. |
660c6846 DM |
746 | pub fn new_thread<F>( |
747 | worker_type: &str, | |
748 | worker_id: Option<String>, | |
049a22a3 | 749 | auth_id: String, |
660c6846 DM |
750 | to_stdout: bool, |
751 | f: F, | |
752 | ) -> Result<String, Error> | |
d3f4c08f | 753 | where F: Send + UnwindSafe + 'static + FnOnce(Arc<WorkerTask>) -> Result<(), Error> |
479f6e40 | 754 | { |
e6dc35ac | 755 | let worker = WorkerTask::new(worker_type, worker_id, auth_id, to_stdout)?; |
660c6846 | 756 | let upid_str = worker.upid.to_string(); |
479f6e40 | 757 | |
217170e1 | 758 | let _child = std::thread::Builder::new().name(upid_str.clone()).spawn(move || { |
d3f4c08f DM |
759 | let worker1 = worker.clone(); |
760 | let result = match std::panic::catch_unwind(move || f(worker1)) { | |
761 | Ok(r) => r, | |
762 | Err(panic) => { | |
763 | match panic.downcast::<&str>() { | |
764 | Ok(panic_msg) => { | |
765 | Err(format_err!("worker panicked: {}", panic_msg)) | |
766 | } | |
767 | Err(_) => { | |
768 | Err(format_err!("worker panicked: unknown type.")) | |
769 | } | |
770 | } | |
771 | } | |
772 | }; | |
773 | ||
dd8e744f | 774 | worker.log_result(&result); |
479f6e40 DM |
775 | }); |
776 | ||
660c6846 | 777 | Ok(upid_str) |
479f6e40 DM |
778 | } |
779 | ||
4c116baf DC |
780 | /// create state from self and a result |
781 | pub fn create_state(&self, result: &Result<(), Error>) -> TaskState { | |
f6de2c73 | 782 | let warn_count = self.data.lock().unwrap().warn_count; |
cef03f41 | 783 | |
6a7be83e | 784 | let endtime = proxmox::tools::time::epoch_i64(); |
77bd2a46 | 785 | |
4b01c983 | 786 | if let Err(err) = result { |
77bd2a46 | 787 | TaskState::Error { message: err.to_string(), endtime } |
f6de2c73 | 788 | } else if warn_count > 0 { |
77bd2a46 | 789 | TaskState::Warning { count: warn_count, endtime } |
4b01c983 | 790 | } else { |
77bd2a46 | 791 | TaskState::OK { endtime } |
4b01c983 | 792 | } |
cef03f41 DC |
793 | } |
794 | ||
795 | /// Log task result, remove task from running list | |
796 | pub fn log_result(&self, result: &Result<(), Error>) { | |
4c116baf | 797 | let state = self.create_state(result); |
1ec0d70d | 798 | self.log_message(state.result_text()); |
418def7a DM |
799 | |
800 | WORKER_TASK_LIST.lock().unwrap().remove(&self.upid.task_id); | |
0a33fba4 | 801 | let _ = self.setup.update_active_workers(None); |
b9700a9f | 802 | crate::set_worker_count(WORKER_TASK_LIST.lock().unwrap().len()); |
4b01c983 DM |
803 | } |
804 | ||
882594c5 | 805 | /// Log a message. |
1ec0d70d | 806 | pub fn log_message<S: AsRef<str>>(&self, msg: S) { |
479f6e40 DM |
807 | let mut data = self.data.lock().unwrap(); |
808 | data.logger.log(msg); | |
809 | } | |
810 | ||
f6de2c73 | 811 | /// Log a message as warning. |
1ec0d70d | 812 | pub fn log_warning<S: AsRef<str>>(&self, msg: S) { |
f6de2c73 DC |
813 | let mut data = self.data.lock().unwrap(); |
814 | data.logger.log(format!("WARN: {}", msg.as_ref())); | |
815 | data.warn_count += 1; | |
816 | } | |
817 | ||
882594c5 | 818 | /// Set progress indicator |
479f6e40 DM |
819 | pub fn progress(&self, progress: f64) { |
820 | if progress >= 0.0 && progress <= 1.0 { | |
821 | let mut data = self.data.lock().unwrap(); | |
822 | data.progress = progress; | |
823 | } else { | |
824 | // fixme: log!("task '{}': ignoring strange value for progress '{}'", self.upid, progress); | |
825 | } | |
826 | } | |
827 | ||
882594c5 | 828 | /// Request abort |
d607b886 | 829 | pub fn request_abort(&self) { |
a6c16894 DM |
830 | let prev_abort = self.abort_requested.swap(true, Ordering::SeqCst); |
831 | if !prev_abort { // log abort one time | |
1ec0d70d | 832 | self.log_message(format!("received abort request ...")); |
a6c16894 | 833 | } |
75bc49be DM |
834 | // noitify listeners |
835 | let mut data = self.data.lock().unwrap(); | |
836 | loop { | |
837 | match data.abort_listeners.pop() { | |
838 | None => { break; }, | |
839 | Some(ch) => { | |
d1d74c43 | 840 | let _ = ch.send(()); // ignore errors here |
75bc49be DM |
841 | }, |
842 | } | |
843 | } | |
479f6e40 DM |
844 | } |
845 | ||
75bc49be DM |
846 | /// Get a future which resolves on task abort |
847 | pub fn abort_future(&self) -> oneshot::Receiver<()> { | |
848 | let (tx, rx) = oneshot::channel::<()>(); | |
849 | ||
850 | let mut data = self.data.lock().unwrap(); | |
851 | if self.abort_requested() { | |
852 | let _ = tx.send(()); | |
853 | } else { | |
854 | data.abort_listeners.push(tx); | |
855 | } | |
856 | rx | |
857 | } | |
4bd2a9e4 DC |
858 | |
859 | pub fn upid(&self) -> &UPID { | |
860 | &self.upid | |
861 | } | |
479f6e40 | 862 | } |
d1993187 | 863 | |
c8449217 | 864 | impl WorkerTaskContext for WorkerTask { |
619cd5cb DM |
865 | |
866 | fn abort_requested(&self) -> bool { | |
867 | self.abort_requested.load(Ordering::SeqCst) | |
d1993187 WB |
868 | } |
869 | ||
0fd55b08 DM |
870 | fn shutdown_requested(&self) -> bool { |
871 | crate::shutdown_requested() | |
872 | } | |
873 | ||
874 | fn fail_on_shutdown(&self) -> Result<(), Error> { | |
875 | crate::fail_on_shutdown() | |
876 | } | |
877 | ||
d1993187 WB |
878 | fn log(&self, level: log::Level, message: &std::fmt::Arguments) { |
879 | match level { | |
1ec0d70d DM |
880 | log::Level::Error => self.log_warning(&message.to_string()), |
881 | log::Level::Warn => self.log_warning(&message.to_string()), | |
882 | log::Level::Info => self.log_message(&message.to_string()), | |
883 | log::Level::Debug => self.log_message(&format!("DEBUG: {}", message)), | |
884 | log::Level::Trace => self.log_message(&format!("TRACE: {}", message)), | |
d1993187 WB |
885 | } |
886 | } | |
887 | } | |
72fbe9ff WB |
888 | |
889 | /// Wait for a locally spanned worker task | |
890 | /// | |
891 | /// Note: local workers should print logs to stdout, so there is no | |
892 | /// need to fetch/display logs. We just wait for the worker to finish. | |
893 | pub async fn wait_for_local_worker(upid_str: &str) -> Result<(), Error> { | |
894 | ||
895 | let upid: UPID = upid_str.parse()?; | |
896 | ||
897 | let sleep_duration = core::time::Duration::new(0, 100_000_000); | |
898 | ||
899 | loop { | |
900 | if worker_is_active_local(&upid) { | |
901 | tokio::time::sleep(sleep_duration).await; | |
902 | } else { | |
903 | break; | |
904 | } | |
905 | } | |
906 | Ok(()) | |
907 | } | |
3f742f95 DC |
908 | |
909 | /// Request abort of a local worker (if existing and running) | |
910 | pub fn abort_local_worker(upid: UPID) { | |
911 | if let Some(ref worker) = WORKER_TASK_LIST.lock().unwrap().get(&upid.task_id) { | |
912 | worker.request_abort(); | |
913 | } | |
914 | } |