]>
Commit | Line | Data |
---|---|---|
e7244387 | 1 | use std::collections::{HashMap, VecDeque}; |
4b01c983 | 2 | use std::fs::File; |
5ade6c25 | 3 | use std::io::{Read, Write, BufRead, BufReader}; |
d3f4c08f | 4 | use std::panic::UnwindSafe; |
18c0df4c WB |
5 | use std::sync::atomic::{AtomicBool, Ordering}; |
6 | use std::sync::{Arc, Mutex}; | |
d3f4c08f | 7 | |
f7d4e4b5 | 8 | use anyhow::{bail, format_err, Error}; |
18c0df4c WB |
9 | use futures::*; |
10 | use lazy_static::lazy_static; | |
321070b4 | 11 | use serde_json::{json, Value}; |
4c116baf | 12 | use serde::{Serialize, Deserialize}; |
18c0df4c | 13 | use tokio::sync::oneshot; |
479f6e40 | 14 | |
619495b2 | 15 | use proxmox::sys::linux::procfs; |
9ea4bce4 | 16 | use proxmox::try_block; |
98c259b4 | 17 | use proxmox::tools::fs::{create_path, open_file_locked, replace_file, CreateOptions}; |
e18a6c9e | 18 | |
634132fe DM |
19 | use super::UPID; |
20 | ||
346a488e | 21 | use crate::buildcfg; |
a68768cf | 22 | use crate::server; |
e7244387 | 23 | use crate::tools::logrotate::{LogRotate, LogRotateFiles}; |
c0df91f8 | 24 | use crate::tools::{FileLogger, FileLogOptions}; |
fa31f4c5 | 25 | use crate::api2::types::{Authid, TaskStateType}; |
479f6e40 | 26 | |
346a488e TL |
27 | macro_rules! taskdir { |
28 | ($subdir:expr) => (concat!(PROXMOX_BACKUP_LOG_DIR_M!(), "/tasks", $subdir)) | |
29 | } | |
30 | pub const PROXMOX_BACKUP_TASK_DIR: &str = taskdir!("/"); | |
31 | pub const PROXMOX_BACKUP_TASK_LOCK_FN: &str = taskdir!("/.active.lock"); | |
32 | pub const PROXMOX_BACKUP_ACTIVE_TASK_FN: &str = taskdir!("/active"); | |
33 | pub const PROXMOX_BACKUP_INDEX_TASK_FN: &str = taskdir!("/index"); | |
34 | pub const PROXMOX_BACKUP_ARCHIVE_TASK_FN: &str = taskdir!("/archive"); | |
784fa1c2 | 35 | |
479f6e40 DM |
36 | lazy_static! { |
37 | static ref WORKER_TASK_LIST: Mutex<HashMap<usize, Arc<WorkerTask>>> = Mutex::new(HashMap::new()); | |
a68768cf | 38 | } |
d607b886 | 39 | |
a68768cf TL |
40 | /// checks if the task UPID refers to a worker from this process |
41 | fn is_local_worker(upid: &UPID) -> bool { | |
42 | upid.pid == server::pid() && upid.pstart == server::pstart() | |
479f6e40 DM |
43 | } |
44 | ||
634132fe | 45 | /// Test if the task is still running |
5751e495 | 46 | pub async fn worker_is_active(upid: &UPID) -> Result<bool, Error> { |
a68768cf | 47 | if is_local_worker(upid) { |
5751e495 DM |
48 | return Ok(WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id)); |
49 | } | |
50 | ||
3984a5fd | 51 | if procfs::check_process_running_pstart(upid.pid, upid.pstart).is_none() { |
5751e495 DM |
52 | return Ok(false); |
53 | } | |
54 | ||
a68768cf | 55 | let sock = server::ctrl_sock_from_pid(upid.pid); |
5751e495 | 56 | let cmd = json!({ |
a68768cf | 57 | "command": "worker-task-status", |
385681c9 TL |
58 | "args": { |
59 | "upid": upid.to_string(), | |
60 | }, | |
5751e495 | 61 | }); |
45b8a032 | 62 | let status = super::send_command(sock, &cmd).await?; |
4494d078 | 63 | |
5751e495 DM |
64 | if let Some(active) = status.as_bool() { |
65 | Ok(active) | |
66 | } else { | |
67 | bail!("got unexpected result {:?} (expected bool)", status); | |
68 | } | |
69 | } | |
70 | ||
71 | /// Test if the task is still running (fast but inaccurate implementation) | |
72 | /// | |
a68768cf | 73 | /// If the task is spawned from a different process, we simply return if |
5751e495 DM |
74 | /// that process is still running. This information is good enough to detect |
75 | /// stale tasks... | |
77ebbefc | 76 | pub fn worker_is_active_local(upid: &UPID) -> bool { |
a68768cf | 77 | if is_local_worker(upid) { |
62ee2eb4 | 78 | WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id) |
634132fe | 79 | } else { |
62ee2eb4 | 80 | procfs::check_process_running_pstart(upid.pid, upid.pstart).is_some() |
479f6e40 DM |
81 | } |
82 | } | |
83 | ||
a68768cf TL |
84 | pub fn register_task_control_commands( |
85 | commando_sock: &mut super::CommandoSocket, | |
86 | ) -> Result<(), Error> { | |
87 | fn get_upid(args: Option<&Value>) -> Result<UPID, Error> { | |
88 | let args = if let Some(args) = args { args } else { bail!("missing args") }; | |
89 | let upid = match args.get("upid") { | |
90 | Some(Value::String(upid)) => upid.parse::<UPID>()?, | |
91 | None => bail!("no upid in args"), | |
92 | _ => bail!("unable to parse upid"), | |
93 | }; | |
94 | if !is_local_worker(&upid) { | |
d607b886 DM |
95 | bail!("upid does not belong to this process"); |
96 | } | |
a68768cf TL |
97 | Ok(upid) |
98 | } | |
d607b886 | 99 | |
a68768cf TL |
100 | commando_sock.register_command("worker-task-abort".into(), move |args| { |
101 | let upid = get_upid(args)?; | |
102 | ||
103 | if let Some(ref worker) = WORKER_TASK_LIST.lock().unwrap().get(&upid.task_id) { | |
104 | worker.request_abort(); | |
d607b886 | 105 | } |
a68768cf | 106 | Ok(Value::Null) |
d607b886 | 107 | })?; |
a68768cf TL |
108 | commando_sock.register_command("worker-task-status".into(), move |args| { |
109 | let upid = get_upid(args)?; | |
d607b886 | 110 | |
a68768cf TL |
111 | let active = WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id); |
112 | ||
113 | Ok(active.into()) | |
114 | })?; | |
d607b886 DM |
115 | |
116 | Ok(()) | |
117 | } | |
118 | ||
321070b4 | 119 | pub fn abort_worker_async(upid: UPID) { |
75fef4b4 WB |
120 | tokio::spawn(async move { |
121 | if let Err(err) = abort_worker(upid).await { | |
321070b4 DM |
122 | eprintln!("abort worker failed - {}", err); |
123 | } | |
75fef4b4 | 124 | }); |
321070b4 DM |
125 | } |
126 | ||
5751e495 | 127 | pub async fn abort_worker(upid: UPID) -> Result<(), Error> { |
321070b4 | 128 | |
a68768cf | 129 | let sock = server::ctrl_sock_from_pid(upid.pid); |
321070b4 | 130 | let cmd = json!({ |
a68768cf | 131 | "command": "worker-task-abort", |
385681c9 TL |
132 | "args": { |
133 | "upid": upid.to_string(), | |
134 | }, | |
321070b4 | 135 | }); |
45b8a032 | 136 | super::send_command(sock, &cmd).map_ok(|_| ()).await |
321070b4 DM |
137 | } |
138 | ||
77bd2a46 | 139 | fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<TaskState>), Error> { |
4b01c983 DM |
140 | |
141 | let data = line.splitn(3, ' ').collect::<Vec<&str>>(); | |
142 | ||
143 | let len = data.len(); | |
144 | ||
145 | match len { | |
146 | 1 => Ok((data[0].to_owned(), data[0].parse::<UPID>()?, None)), | |
147 | 3 => { | |
148 | let endtime = i64::from_str_radix(data[1], 16)?; | |
77bd2a46 DC |
149 | let state = TaskState::from_endtime_and_message(endtime, data[2])?; |
150 | Ok((data[0].to_owned(), data[0].parse::<UPID>()?, Some(state))) | |
4b01c983 DM |
151 | } |
152 | _ => bail!("wrong number of components"), | |
153 | } | |
154 | } | |
155 | ||
35950380 | 156 | /// Create task log directory with correct permissions |
d607b886 | 157 | pub fn create_task_log_dirs() -> Result<(), Error> { |
35950380 DM |
158 | |
159 | try_block!({ | |
f74a03da | 160 | let backup_user = crate::backup::backup_user()?; |
35238e23 | 161 | let opts = CreateOptions::new() |
f74a03da DM |
162 | .owner(backup_user.uid) |
163 | .group(backup_user.gid); | |
35950380 | 164 | |
346a488e | 165 | create_path(buildcfg::PROXMOX_BACKUP_LOG_DIR, None, Some(opts.clone()))?; |
35238e23 | 166 | create_path(PROXMOX_BACKUP_TASK_DIR, None, Some(opts.clone()))?; |
346a488e | 167 | create_path(buildcfg::PROXMOX_BACKUP_RUN_DIR, None, Some(opts))?; |
35950380 DM |
168 | Ok(()) |
169 | }).map_err(|err: Error| format_err!("unable to create task log dir - {}", err))?; | |
170 | ||
171 | Ok(()) | |
172 | } | |
173 | ||
ae197dda DC |
174 | /// Read endtime (time of last log line) and exitstatus from task log file |
175 | /// If there is not a single line with at valid datetime, we assume the | |
176 | /// starttime to be the endtime | |
77bd2a46 | 177 | pub fn upid_read_status(upid: &UPID) -> Result<TaskState, Error> { |
56b66645 DM |
178 | |
179 | let mut status = TaskState::Unknown { endtime: upid.starttime }; | |
4b01c983 | 180 | |
4494d078 | 181 | let path = upid.log_path(); |
4b01c983 | 182 | |
0bfd87bc DM |
183 | let mut file = File::open(path)?; |
184 | ||
185 | /// speedup - only read tail | |
186 | use std::io::Seek; | |
187 | use std::io::SeekFrom; | |
188 | let _ = file.seek(SeekFrom::End(-8192)); // ignore errors | |
189 | ||
56b66645 DM |
190 | let mut data = Vec::with_capacity(8192); |
191 | file.read_to_end(&mut data)?; | |
4b01c983 | 192 | |
6864fd01 DC |
193 | // strip newlines at the end of the task logs |
194 | while data.last() == Some(&b'\n') { | |
a4c11436 DC |
195 | data.pop(); |
196 | } | |
197 | ||
6864fd01 DC |
198 | let last_line = match data.iter().rposition(|c| *c == b'\n') { |
199 | Some(start) if data.len() > (start+1) => &data[start+1..], | |
200 | Some(_) => &data, // should not happen, since we removed all trailing newlines | |
201 | None => &data, | |
56b66645 DM |
202 | }; |
203 | ||
204 | let last_line = std::str::from_utf8(last_line) | |
205 | .map_err(|err| format_err!("upid_read_status: utf8 parse failed: {}", err))?; | |
206 | ||
207 | let mut iter = last_line.splitn(2, ": "); | |
208 | if let Some(time_str) = iter.next() { | |
6a7be83e | 209 | if let Ok(endtime) = proxmox::tools::time::parse_rfc3339(time_str) { |
ef0ea4ba DC |
210 | // set the endtime even if we cannot parse the state |
211 | status = TaskState::Unknown { endtime }; | |
56b66645 | 212 | if let Some(rest) = iter.next().and_then(|rest| rest.strip_prefix("TASK ")) { |
77bd2a46 | 213 | if let Ok(state) = TaskState::from_endtime_and_message(endtime, rest) { |
4c116baf | 214 | status = state; |
4b01c983 DM |
215 | } |
216 | } | |
217 | } | |
218 | } | |
219 | ||
77bd2a46 | 220 | Ok(status) |
4b01c983 DM |
221 | } |
222 | ||
4c116baf | 223 | /// Task State |
77bd2a46 | 224 | #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] |
4c116baf DC |
225 | pub enum TaskState { |
226 | /// The Task ended with an undefined state | |
77bd2a46 | 227 | Unknown { endtime: i64 }, |
4c116baf | 228 | /// The Task ended and there were no errors or warnings |
77bd2a46 | 229 | OK { endtime: i64 }, |
4c116baf | 230 | /// The Task had 'count' amount of warnings and no errors |
77bd2a46 | 231 | Warning { count: u64, endtime: i64 }, |
4c116baf | 232 | /// The Task ended with the error described in 'message' |
77bd2a46 | 233 | Error { message: String, endtime: i64 }, |
4c116baf DC |
234 | } |
235 | ||
236 | impl TaskState { | |
77bd2a46 DC |
237 | pub fn endtime(&self) -> i64 { |
238 | match *self { | |
239 | TaskState::Unknown { endtime } => endtime, | |
240 | TaskState::OK { endtime } => endtime, | |
241 | TaskState::Warning { endtime, .. } => endtime, | |
242 | TaskState::Error { endtime, .. } => endtime, | |
4c116baf DC |
243 | } |
244 | } | |
4c116baf | 245 | |
fa31f4c5 DC |
246 | pub fn tasktype(&self) -> TaskStateType { |
247 | match self { | |
248 | TaskState::OK { .. } => TaskStateType::OK, | |
249 | TaskState::Unknown { .. } => TaskStateType::Unknown, | |
250 | TaskState::Error { .. } => TaskStateType::Error, | |
251 | TaskState::Warning { .. } => TaskStateType::Warning, | |
252 | } | |
253 | } | |
254 | ||
77bd2a46 | 255 | fn result_text(&self) -> String { |
4c116baf | 256 | match self { |
77bd2a46 DC |
257 | TaskState::Error { message, .. } => format!("TASK ERROR: {}", message), |
258 | other => format!("TASK {}", other), | |
4c116baf DC |
259 | } |
260 | } | |
4c116baf | 261 | |
77bd2a46 | 262 | fn from_endtime_and_message(endtime: i64, s: &str) -> Result<Self, Error> { |
4c116baf | 263 | if s == "unknown" { |
77bd2a46 | 264 | Ok(TaskState::Unknown { endtime }) |
4c116baf | 265 | } else if s == "OK" { |
77bd2a46 | 266 | Ok(TaskState::OK { endtime }) |
365915da FG |
267 | } else if let Some(warnings) = s.strip_prefix("WARNINGS: ") { |
268 | let count: u64 = warnings.parse()?; | |
77bd2a46 | 269 | Ok(TaskState::Warning{ count, endtime }) |
3984a5fd | 270 | } else if !s.is_empty() { |
365915da | 271 | let message = if let Some(err) = s.strip_prefix("ERROR: ") { err } else { s }.to_string(); |
77bd2a46 | 272 | Ok(TaskState::Error{ message, endtime }) |
4c116baf DC |
273 | } else { |
274 | bail!("unable to parse Task Status '{}'", s); | |
275 | } | |
276 | } | |
277 | } | |
278 | ||
77bd2a46 DC |
279 | impl std::cmp::PartialOrd for TaskState { |
280 | fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { | |
281 | Some(self.endtime().cmp(&other.endtime())) | |
282 | } | |
283 | } | |
284 | ||
285 | impl std::cmp::Ord for TaskState { | |
286 | fn cmp(&self, other: &Self) -> std::cmp::Ordering { | |
287 | self.endtime().cmp(&other.endtime()) | |
288 | } | |
289 | } | |
290 | ||
291 | impl std::fmt::Display for TaskState { | |
292 | fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | |
293 | match self { | |
294 | TaskState::Unknown { .. } => write!(f, "unknown"), | |
295 | TaskState::OK { .. }=> write!(f, "OK"), | |
296 | TaskState::Warning { count, .. } => write!(f, "WARNINGS: {}", count), | |
297 | TaskState::Error { message, .. } => write!(f, "{}", message), | |
298 | } | |
299 | } | |
300 | } | |
301 | ||
93aebb38 DM |
302 | /// Task details including parsed UPID |
303 | /// | |
304 | /// If there is no `state`, the task is still running. | |
305 | #[derive(Debug)] | |
306 | pub struct TaskListInfo { | |
307 | /// The parsed UPID | |
308 | pub upid: UPID, | |
309 | /// UPID string representation | |
310 | pub upid_str: String, | |
311 | /// Task `(endtime, status)` if already finished | |
77bd2a46 | 312 | pub state: Option<TaskState>, // endtime, status |
93aebb38 DM |
313 | } |
314 | ||
66f4e6a8 DC |
315 | fn lock_task_list_files(exclusive: bool) -> Result<std::fs::File, Error> { |
316 | let backup_user = crate::backup::backup_user()?; | |
317 | ||
318 | let lock = open_file_locked(PROXMOX_BACKUP_TASK_LOCK_FN, std::time::Duration::new(10, 0), exclusive)?; | |
319 | nix::unistd::chown(PROXMOX_BACKUP_TASK_LOCK_FN, Some(backup_user.uid), Some(backup_user.gid))?; | |
320 | ||
321 | Ok(lock) | |
322 | } | |
323 | ||
9a760917 DC |
324 | /// checks if the Task Archive is bigger that 'size_threshold' bytes, and |
325 | /// rotates it if it is | |
326 | pub fn rotate_task_log_archive(size_threshold: u64, compress: bool, max_files: Option<usize>) -> Result<bool, Error> { | |
327 | let _lock = lock_task_list_files(true)?; | |
2d81f7b0 | 328 | |
95ade8fd | 329 | let mut logrotate = LogRotate::new(PROXMOX_BACKUP_ARCHIVE_TASK_FN, compress) |
e062ebbc | 330 | .ok_or_else(|| format_err!("could not get archive file names"))?; |
95ade8fd TL |
331 | |
332 | logrotate.rotate(size_threshold, None, max_files) | |
9a760917 DC |
333 | } |
334 | ||
93aebb38 DM |
335 | // atomically read/update the task list, update status of finished tasks |
336 | // new_upid is added to the list when specified. | |
c386b06f | 337 | fn update_active_workers(new_upid: Option<&UPID>) -> Result<(), Error> { |
4b01c983 | 338 | |
f74a03da | 339 | let backup_user = crate::backup::backup_user()?; |
35950380 | 340 | |
66f4e6a8 | 341 | let lock = lock_task_list_files(true)?; |
4b01c983 | 342 | |
264779e7 | 343 | // TODO remove with 1.x |
784fa1c2 | 344 | let mut finish_list: Vec<TaskListInfo> = read_task_file_from_path(PROXMOX_BACKUP_INDEX_TASK_FN)?; |
264779e7 DC |
345 | let had_index_file = !finish_list.is_empty(); |
346 | ||
2c159226 WB |
347 | // We use filter_map because one negative case wants to *move* the data into `finish_list`, |
348 | // clippy doesn't quite catch this! | |
349 | #[allow(clippy::unnecessary_filter_map)] | |
784fa1c2 DC |
350 | let mut active_list: Vec<TaskListInfo> = read_task_file_from_path(PROXMOX_BACKUP_ACTIVE_TASK_FN)? |
351 | .into_iter() | |
352 | .filter_map(|info| { | |
353 | if info.state.is_some() { | |
354 | // this can happen when the active file still includes finished tasks | |
355 | finish_list.push(info); | |
356 | return None; | |
4b01c983 | 357 | } |
4b01c983 | 358 | |
784fa1c2 | 359 | if !worker_is_active_local(&info.upid) { |
26b62138 | 360 | // println!("Detected stopped task '{}'", &info.upid_str); |
784fa1c2 | 361 | let now = proxmox::tools::time::epoch_i64(); |
22a9189e | 362 | let status = upid_read_status(&info.upid).unwrap_or(TaskState::Unknown { endtime: now }); |
784fa1c2 DC |
363 | finish_list.push(TaskListInfo { |
364 | upid: info.upid, | |
365 | upid_str: info.upid_str, | |
366 | state: Some(status) | |
367 | }); | |
368 | return None; | |
4b01c983 | 369 | } |
784fa1c2 DC |
370 | |
371 | Some(info) | |
372 | }).collect(); | |
4b01c983 DM |
373 | |
374 | if let Some(upid) = new_upid { | |
375 | active_list.push(TaskListInfo { upid: upid.clone(), upid_str: upid.to_string(), state: None }); | |
376 | } | |
377 | ||
784fa1c2 | 378 | let active_raw = render_task_list(&active_list); |
4b01c983 | 379 | |
784fa1c2 DC |
380 | replace_file( |
381 | PROXMOX_BACKUP_ACTIVE_TASK_FN, | |
382 | active_raw.as_bytes(), | |
383 | CreateOptions::new() | |
384 | .owner(backup_user.uid) | |
385 | .group(backup_user.gid), | |
386 | )?; | |
93aebb38 | 387 | |
784fa1c2 | 388 | finish_list.sort_unstable_by(|a, b| { |
4b01c983 | 389 | match (&a.state, &b.state) { |
77bd2a46 | 390 | (Some(s1), Some(s2)) => s1.cmp(&s2), |
4b01c983 DM |
391 | (Some(_), None) => std::cmp::Ordering::Less, |
392 | (None, Some(_)) => std::cmp::Ordering::Greater, | |
393 | _ => a.upid.starttime.cmp(&b.upid.starttime), | |
394 | } | |
395 | }); | |
396 | ||
264779e7 | 397 | if !finish_list.is_empty() { |
5ade6c25 DC |
398 | match std::fs::OpenOptions::new().append(true).create(true).open(PROXMOX_BACKUP_ARCHIVE_TASK_FN) { |
399 | Ok(mut writer) => { | |
264779e7 | 400 | for info in &finish_list { |
5ade6c25 DC |
401 | writer.write_all(render_task_line(&info).as_bytes())?; |
402 | } | |
403 | }, | |
404 | Err(err) => bail!("could not write task archive - {}", err), | |
405 | } | |
406 | ||
407 | nix::unistd::chown(PROXMOX_BACKUP_ARCHIVE_TASK_FN, Some(backup_user.uid), Some(backup_user.gid))?; | |
408 | } | |
409 | ||
264779e7 DC |
410 | // TODO Remove with 1.x |
411 | // for compatibility, if we had an INDEX file, we do not need it anymore | |
412 | if had_index_file { | |
413 | let _ = nix::unistd::unlink(PROXMOX_BACKUP_INDEX_TASK_FN); | |
414 | } | |
415 | ||
4b01c983 DM |
416 | drop(lock); |
417 | ||
c386b06f | 418 | Ok(()) |
93aebb38 | 419 | } |
4b01c983 | 420 | |
bbeb0256 DC |
421 | fn render_task_line(info: &TaskListInfo) -> String { |
422 | let mut raw = String::new(); | |
423 | if let Some(status) = &info.state { | |
424 | raw.push_str(&format!("{} {:08X} {}\n", info.upid_str, status.endtime(), status)); | |
425 | } else { | |
426 | raw.push_str(&info.upid_str); | |
427 | raw.push('\n'); | |
428 | } | |
429 | ||
430 | raw | |
431 | } | |
432 | ||
433 | fn render_task_list(list: &[TaskListInfo]) -> String { | |
434 | let mut raw = String::new(); | |
435 | for info in list { | |
436 | raw.push_str(&render_task_line(&info)); | |
437 | } | |
438 | raw | |
439 | } | |
440 | ||
784fa1c2 DC |
441 | // note this is not locked, caller has to make sure it is |
442 | // this will skip (and log) lines that are not valid status lines | |
443 | fn read_task_file<R: Read>(reader: R) -> Result<Vec<TaskListInfo>, Error> | |
444 | { | |
445 | let reader = BufReader::new(reader); | |
446 | let mut list = Vec::new(); | |
447 | for line in reader.lines() { | |
448 | let line = line?; | |
449 | match parse_worker_status_line(&line) { | |
450 | Ok((upid_str, upid, state)) => list.push(TaskListInfo { | |
451 | upid_str, | |
452 | upid, | |
453 | state | |
454 | }), | |
455 | Err(err) => { | |
456 | eprintln!("unable to parse worker status '{}' - {}", line, err); | |
457 | continue; | |
458 | } | |
459 | }; | |
460 | } | |
461 | ||
462 | Ok(list) | |
463 | } | |
464 | ||
465 | // note this is not locked, caller has to make sure it is | |
466 | fn read_task_file_from_path<P>(path: P) -> Result<Vec<TaskListInfo>, Error> | |
467 | where | |
468 | P: AsRef<std::path::Path> + std::fmt::Debug, | |
469 | { | |
470 | let file = match File::open(&path) { | |
471 | Ok(f) => f, | |
472 | Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()), | |
473 | Err(err) => bail!("unable to open task list {:?} - {}", path, err), | |
474 | }; | |
475 | ||
476 | read_task_file(file) | |
477 | } | |
478 | ||
e7244387 DC |
479 | pub struct TaskListInfoIterator { |
480 | list: VecDeque<TaskListInfo>, | |
264779e7 | 481 | end: bool, |
e7244387 DC |
482 | archive: Option<LogRotateFiles>, |
483 | lock: Option<File>, | |
484 | } | |
485 | ||
486 | impl TaskListInfoIterator { | |
487 | pub fn new(active_only: bool) -> Result<Self, Error> { | |
488 | let (read_lock, active_list) = { | |
489 | let lock = lock_task_list_files(false)?; | |
490 | let active_list = read_task_file_from_path(PROXMOX_BACKUP_ACTIVE_TASK_FN)?; | |
491 | ||
492 | let needs_update = active_list | |
493 | .iter() | |
df4827f2 | 494 | .any(|info| info.state.is_some() || !worker_is_active_local(&info.upid)); |
e7244387 | 495 | |
264779e7 DC |
496 | // TODO remove with 1.x |
497 | let index_exists = std::path::Path::new(PROXMOX_BACKUP_INDEX_TASK_FN).is_file(); | |
498 | ||
499 | if needs_update || index_exists { | |
e7244387 DC |
500 | drop(lock); |
501 | update_active_workers(None)?; | |
502 | let lock = lock_task_list_files(false)?; | |
503 | let active_list = read_task_file_from_path(PROXMOX_BACKUP_ACTIVE_TASK_FN)?; | |
504 | (lock, active_list) | |
505 | } else { | |
506 | (lock, active_list) | |
507 | } | |
508 | }; | |
509 | ||
510 | let archive = if active_only { | |
511 | None | |
512 | } else { | |
e4f5f59e TL |
513 | let logrotate = LogRotate::new(PROXMOX_BACKUP_ARCHIVE_TASK_FN, true) |
514 | .ok_or_else(|| format_err!("could not get archive file names"))?; | |
e7244387 DC |
515 | Some(logrotate.files()) |
516 | }; | |
517 | ||
e7244387 DC |
518 | let lock = if active_only { None } else { Some(read_lock) }; |
519 | ||
520 | Ok(Self { | |
521 | list: active_list.into(), | |
264779e7 | 522 | end: active_only, |
e7244387 DC |
523 | archive, |
524 | lock, | |
525 | }) | |
526 | } | |
527 | } | |
528 | ||
529 | impl Iterator for TaskListInfoIterator { | |
530 | type Item = Result<TaskListInfo, Error>; | |
531 | ||
532 | fn next(&mut self) -> Option<Self::Item> { | |
533 | loop { | |
534 | if let Some(element) = self.list.pop_back() { | |
535 | return Some(Ok(element)); | |
264779e7 DC |
536 | } else if self.end { |
537 | return None; | |
e7244387 | 538 | } else { |
264779e7 DC |
539 | if let Some(mut archive) = self.archive.take() { |
540 | if let Some(file) = archive.next() { | |
541 | let list = match read_task_file(file) { | |
542 | Ok(list) => list, | |
e7244387 DC |
543 | Err(err) => return Some(Err(err)), |
544 | }; | |
264779e7 DC |
545 | self.list.append(&mut list.into()); |
546 | self.archive = Some(archive); | |
547 | continue; | |
e7244387 | 548 | } |
e7244387 | 549 | } |
264779e7 DC |
550 | |
551 | self.end = true; | |
552 | self.lock.take(); | |
e7244387 DC |
553 | } |
554 | } | |
555 | } | |
556 | } | |
557 | ||
882594c5 DM |
558 | /// Launch long running worker tasks. |
559 | /// | |
560 | /// A worker task can either be a whole thread, or a simply tokio | |
561 | /// task/future. Each task can `log()` messages, which are stored | |
562 | /// persistently to files. Task should poll the `abort_requested` | |
563 | /// flag, and stop execution when requested. | |
479f6e40 DM |
564 | #[derive(Debug)] |
565 | pub struct WorkerTask { | |
566 | upid: UPID, | |
567 | data: Mutex<WorkerTaskData>, | |
568 | abort_requested: AtomicBool, | |
569 | } | |
570 | ||
571 | impl std::fmt::Display for WorkerTask { | |
572 | ||
573 | fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { | |
574 | self.upid.fmt(f) | |
575 | } | |
576 | } | |
577 | ||
578 | #[derive(Debug)] | |
579 | struct WorkerTaskData { | |
580 | logger: FileLogger, | |
581 | progress: f64, // 0..1 | |
f6de2c73 | 582 | warn_count: u64, |
75bc49be | 583 | pub abort_listeners: Vec<oneshot::Sender<()>>, |
479f6e40 DM |
584 | } |
585 | ||
479f6e40 DM |
586 | impl WorkerTask { |
587 | ||
e6dc35ac | 588 | pub fn new(worker_type: &str, worker_id: Option<String>, auth_id: Authid, to_stdout: bool) -> Result<Arc<Self>, Error> { |
e6dc35ac | 589 | let upid = UPID::new(worker_type, worker_id, auth_id)?; |
634132fe | 590 | let task_id = upid.task_id; |
479f6e40 | 591 | |
634132fe | 592 | let mut path = std::path::PathBuf::from(PROXMOX_BACKUP_TASK_DIR); |
35950380 | 593 | |
7f3d9100 | 594 | path.push(format!("{:02X}", upid.pstart & 255)); |
479f6e40 | 595 | |
f74a03da | 596 | let backup_user = crate::backup::backup_user()?; |
35950380 | 597 | |
f74a03da | 598 | create_path(&path, None, Some(CreateOptions::new().owner(backup_user.uid).group(backup_user.gid)))?; |
479f6e40 DM |
599 | |
600 | path.push(upid.to_string()); | |
601 | ||
c0df91f8 | 602 | let logger_options = FileLogOptions { |
913dddea | 603 | to_stdout, |
c0df91f8 | 604 | exclusive: true, |
e5adbc34 | 605 | prefix_time: true, |
c0df91f8 TL |
606 | read: true, |
607 | ..Default::default() | |
608 | }; | |
609 | let logger = FileLogger::new(&path, logger_options)?; | |
f74a03da | 610 | nix::unistd::chown(&path, Some(backup_user.uid), Some(backup_user.gid))?; |
479f6e40 DM |
611 | |
612 | let worker = Arc::new(Self { | |
05d755b2 | 613 | upid: upid.clone(), |
479f6e40 DM |
614 | abort_requested: AtomicBool::new(false), |
615 | data: Mutex::new(WorkerTaskData { | |
616 | logger, | |
617 | progress: 0.0, | |
f6de2c73 | 618 | warn_count: 0, |
75bc49be | 619 | abort_listeners: vec![], |
479f6e40 DM |
620 | }), |
621 | }); | |
622 | ||
05d755b2 DC |
623 | // scope to drop the lock again after inserting |
624 | { | |
625 | let mut hash = WORKER_TASK_LIST.lock().unwrap(); | |
626 | hash.insert(task_id, worker.clone()); | |
627 | super::set_worker_count(hash.len()); | |
628 | } | |
7a630df7 | 629 | |
05d755b2 | 630 | update_active_workers(Some(&upid))?; |
479f6e40 DM |
631 | |
632 | Ok(worker) | |
633 | } | |
634 | ||
882594c5 | 635 | /// Spawn a new tokio task/future. |
660c6846 DM |
636 | pub fn spawn<F, T>( |
637 | worker_type: &str, | |
638 | worker_id: Option<String>, | |
e6dc35ac | 639 | auth_id: Authid, |
660c6846 DM |
640 | to_stdout: bool, |
641 | f: F, | |
642 | ) -> Result<String, Error> | |
479f6e40 | 643 | where F: Send + 'static + FnOnce(Arc<WorkerTask>) -> T, |
75fef4b4 | 644 | T: Send + 'static + Future<Output = Result<(), Error>>, |
479f6e40 | 645 | { |
e6dc35ac | 646 | let worker = WorkerTask::new(worker_type, worker_id, auth_id, to_stdout)?; |
660c6846 | 647 | let upid_str = worker.upid.to_string(); |
75fef4b4 WB |
648 | let f = f(worker.clone()); |
649 | tokio::spawn(async move { | |
650 | let result = f.await; | |
dd8e744f | 651 | worker.log_result(&result); |
75fef4b4 | 652 | }); |
479f6e40 | 653 | |
660c6846 | 654 | Ok(upid_str) |
479f6e40 DM |
655 | } |
656 | ||
882594c5 | 657 | /// Create a new worker thread. |
660c6846 DM |
658 | pub fn new_thread<F>( |
659 | worker_type: &str, | |
660 | worker_id: Option<String>, | |
e6dc35ac | 661 | auth_id: Authid, |
660c6846 DM |
662 | to_stdout: bool, |
663 | f: F, | |
664 | ) -> Result<String, Error> | |
d3f4c08f | 665 | where F: Send + UnwindSafe + 'static + FnOnce(Arc<WorkerTask>) -> Result<(), Error> |
479f6e40 | 666 | { |
e6dc35ac | 667 | let worker = WorkerTask::new(worker_type, worker_id, auth_id, to_stdout)?; |
660c6846 | 668 | let upid_str = worker.upid.to_string(); |
479f6e40 | 669 | |
217170e1 | 670 | let _child = std::thread::Builder::new().name(upid_str.clone()).spawn(move || { |
d3f4c08f DM |
671 | let worker1 = worker.clone(); |
672 | let result = match std::panic::catch_unwind(move || f(worker1)) { | |
673 | Ok(r) => r, | |
674 | Err(panic) => { | |
675 | match panic.downcast::<&str>() { | |
676 | Ok(panic_msg) => { | |
677 | Err(format_err!("worker panicked: {}", panic_msg)) | |
678 | } | |
679 | Err(_) => { | |
680 | Err(format_err!("worker panicked: unknown type.")) | |
681 | } | |
682 | } | |
683 | } | |
684 | }; | |
685 | ||
dd8e744f | 686 | worker.log_result(&result); |
479f6e40 DM |
687 | }); |
688 | ||
660c6846 | 689 | Ok(upid_str) |
479f6e40 DM |
690 | } |
691 | ||
4c116baf DC |
692 | /// create state from self and a result |
693 | pub fn create_state(&self, result: &Result<(), Error>) -> TaskState { | |
f6de2c73 | 694 | let warn_count = self.data.lock().unwrap().warn_count; |
cef03f41 | 695 | |
6a7be83e | 696 | let endtime = proxmox::tools::time::epoch_i64(); |
77bd2a46 | 697 | |
4b01c983 | 698 | if let Err(err) = result { |
77bd2a46 | 699 | TaskState::Error { message: err.to_string(), endtime } |
f6de2c73 | 700 | } else if warn_count > 0 { |
77bd2a46 | 701 | TaskState::Warning { count: warn_count, endtime } |
4b01c983 | 702 | } else { |
77bd2a46 | 703 | TaskState::OK { endtime } |
4b01c983 | 704 | } |
cef03f41 DC |
705 | } |
706 | ||
707 | /// Log task result, remove task from running list | |
708 | pub fn log_result(&self, result: &Result<(), Error>) { | |
4c116baf DC |
709 | let state = self.create_state(result); |
710 | self.log(state.result_text()); | |
418def7a DM |
711 | |
712 | WORKER_TASK_LIST.lock().unwrap().remove(&self.upid.task_id); | |
713 | let _ = update_active_workers(None); | |
714 | super::set_worker_count(WORKER_TASK_LIST.lock().unwrap().len()); | |
4b01c983 DM |
715 | } |
716 | ||
882594c5 | 717 | /// Log a message. |
479f6e40 DM |
718 | pub fn log<S: AsRef<str>>(&self, msg: S) { |
719 | let mut data = self.data.lock().unwrap(); | |
720 | data.logger.log(msg); | |
721 | } | |
722 | ||
f6de2c73 DC |
723 | /// Log a message as warning. |
724 | pub fn warn<S: AsRef<str>>(&self, msg: S) { | |
725 | let mut data = self.data.lock().unwrap(); | |
726 | data.logger.log(format!("WARN: {}", msg.as_ref())); | |
727 | data.warn_count += 1; | |
728 | } | |
729 | ||
882594c5 | 730 | /// Set progress indicator |
479f6e40 DM |
731 | pub fn progress(&self, progress: f64) { |
732 | if progress >= 0.0 && progress <= 1.0 { | |
733 | let mut data = self.data.lock().unwrap(); | |
734 | data.progress = progress; | |
735 | } else { | |
736 | // fixme: log!("task '{}': ignoring strange value for progress '{}'", self.upid, progress); | |
737 | } | |
738 | } | |
739 | ||
882594c5 | 740 | /// Request abort |
d607b886 | 741 | pub fn request_abort(&self) { |
98a181f0 | 742 | eprintln!("set abort flag for worker {}", self.upid); |
a6c16894 DM |
743 | |
744 | let prev_abort = self.abort_requested.swap(true, Ordering::SeqCst); | |
745 | if !prev_abort { // log abort one time | |
746 | self.log(format!("received abort request ...")); | |
747 | } | |
75bc49be DM |
748 | // noitify listeners |
749 | let mut data = self.data.lock().unwrap(); | |
750 | loop { | |
751 | match data.abort_listeners.pop() { | |
752 | None => { break; }, | |
753 | Some(ch) => { | |
d1d74c43 | 754 | let _ = ch.send(()); // ignore errors here |
75bc49be DM |
755 | }, |
756 | } | |
757 | } | |
479f6e40 DM |
758 | } |
759 | ||
882594c5 | 760 | /// Test if abort was requested. |
479f6e40 DM |
761 | pub fn abort_requested(&self) -> bool { |
762 | self.abort_requested.load(Ordering::SeqCst) | |
763 | } | |
764 | ||
882594c5 | 765 | /// Fail if abort was requested. |
479f6e40 DM |
766 | pub fn fail_on_abort(&self) -> Result<(), Error> { |
767 | if self.abort_requested() { | |
99641a6b | 768 | bail!("abort requested - aborting task"); |
479f6e40 DM |
769 | } |
770 | Ok(()) | |
771 | } | |
75bc49be DM |
772 | |
773 | /// Get a future which resolves on task abort | |
774 | pub fn abort_future(&self) -> oneshot::Receiver<()> { | |
775 | let (tx, rx) = oneshot::channel::<()>(); | |
776 | ||
777 | let mut data = self.data.lock().unwrap(); | |
778 | if self.abort_requested() { | |
779 | let _ = tx.send(()); | |
780 | } else { | |
781 | data.abort_listeners.push(tx); | |
782 | } | |
783 | rx | |
784 | } | |
4bd2a9e4 DC |
785 | |
786 | pub fn upid(&self) -> &UPID { | |
787 | &self.upid | |
788 | } | |
479f6e40 | 789 | } |
d1993187 WB |
790 | |
791 | impl crate::task::TaskState for WorkerTask { | |
792 | fn check_abort(&self) -> Result<(), Error> { | |
793 | self.fail_on_abort() | |
794 | } | |
795 | ||
796 | fn log(&self, level: log::Level, message: &std::fmt::Arguments) { | |
797 | match level { | |
798 | log::Level::Error => self.warn(&message.to_string()), | |
799 | log::Level::Warn => self.warn(&message.to_string()), | |
800 | log::Level::Info => self.log(&message.to_string()), | |
801 | log::Level::Debug => self.log(&format!("DEBUG: {}", message)), | |
802 | log::Level::Trace => self.log(&format!("TRACE: {}", message)), | |
803 | } | |
804 | } | |
805 | } |