]>
Commit | Line | Data |
---|---|---|
479f6e40 | 1 | use std::collections::HashMap; |
4b01c983 | 2 | use std::fs::File; |
56b66645 | 3 | use std::io::{Read, BufRead, BufReader}; |
d3f4c08f | 4 | use std::panic::UnwindSafe; |
18c0df4c WB |
5 | use std::sync::atomic::{AtomicBool, Ordering}; |
6 | use std::sync::{Arc, Mutex}; | |
d3f4c08f | 7 | |
f7d4e4b5 | 8 | use anyhow::{bail, format_err, Error}; |
18c0df4c WB |
9 | use futures::*; |
10 | use lazy_static::lazy_static; | |
619495b2 | 11 | use nix::unistd::Pid; |
321070b4 | 12 | use serde_json::{json, Value}; |
4c116baf | 13 | use serde::{Serialize, Deserialize}; |
18c0df4c | 14 | use tokio::sync::oneshot; |
479f6e40 | 15 | |
619495b2 | 16 | use proxmox::sys::linux::procfs; |
9ea4bce4 | 17 | use proxmox::try_block; |
98c259b4 | 18 | use proxmox::tools::fs::{create_path, open_file_locked, replace_file, CreateOptions}; |
e18a6c9e | 19 | |
634132fe DM |
20 | use super::UPID; |
21 | ||
e18a6c9e | 22 | use crate::tools::FileLogger; |
e7cb4dc5 | 23 | use crate::api2::types::Userid; |
479f6e40 | 24 | |
2ec979e4 | 25 | macro_rules! PROXMOX_BACKUP_VAR_RUN_DIR_M { () => ("/run/proxmox-backup") } |
634132fe DM |
26 | macro_rules! PROXMOX_BACKUP_LOG_DIR_M { () => ("/var/log/proxmox-backup") } |
27 | macro_rules! PROXMOX_BACKUP_TASK_DIR_M { () => (concat!( PROXMOX_BACKUP_LOG_DIR_M!(), "/tasks")) } | |
28 | ||
d607b886 | 29 | pub const PROXMOX_BACKUP_VAR_RUN_DIR: &str = PROXMOX_BACKUP_VAR_RUN_DIR_M!(); |
634132fe DM |
30 | pub const PROXMOX_BACKUP_LOG_DIR: &str = PROXMOX_BACKUP_LOG_DIR_M!(); |
31 | pub const PROXMOX_BACKUP_TASK_DIR: &str = PROXMOX_BACKUP_TASK_DIR_M!(); | |
32 | pub const PROXMOX_BACKUP_TASK_LOCK_FN: &str = concat!(PROXMOX_BACKUP_TASK_DIR_M!(), "/.active.lock"); | |
33 | pub const PROXMOX_BACKUP_ACTIVE_TASK_FN: &str = concat!(PROXMOX_BACKUP_TASK_DIR_M!(), "/active"); | |
479f6e40 DM |
34 | |
35 | lazy_static! { | |
36 | static ref WORKER_TASK_LIST: Mutex<HashMap<usize, Arc<WorkerTask>>> = Mutex::new(HashMap::new()); | |
d607b886 DM |
37 | |
38 | static ref MY_PID: i32 = unsafe { libc::getpid() }; | |
6a0dc4a5 | 39 | static ref MY_PID_PSTART: u64 = procfs::PidStat::read_from_pid(Pid::from_raw(*MY_PID)) |
619495b2 WB |
40 | .unwrap() |
41 | .starttime; | |
479f6e40 DM |
42 | } |
43 | ||
634132fe | 44 | /// Test if the task is still running |
5751e495 DM |
45 | pub async fn worker_is_active(upid: &UPID) -> Result<bool, Error> { |
46 | if (upid.pid == *MY_PID) && (upid.pstart == *MY_PID_PSTART) { | |
47 | return Ok(WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id)); | |
48 | } | |
49 | ||
50 | if !procfs::check_process_running_pstart(upid.pid, upid.pstart).is_some() { | |
51 | return Ok(false); | |
52 | } | |
53 | ||
54 | let socketname = format!( | |
55 | "\0{}/proxmox-task-control-{}.sock", PROXMOX_BACKUP_VAR_RUN_DIR, upid.pid); | |
56 | ||
57 | let cmd = json!({ | |
58 | "command": "status", | |
59 | "upid": upid.to_string(), | |
60 | }); | |
61 | ||
62 | let status = super::send_command(socketname, cmd).await?; | |
4494d078 | 63 | |
5751e495 DM |
64 | if let Some(active) = status.as_bool() { |
65 | Ok(active) | |
66 | } else { | |
67 | bail!("got unexpected result {:?} (expected bool)", status); | |
68 | } | |
69 | } | |
70 | ||
71 | /// Test if the task is still running (fast but inaccurate implementation) | |
72 | /// | |
73 | /// If the task is spanned from a different process, we simply return if | |
74 | /// that process is still running. This information is good enough to detect | |
75 | /// stale tasks... | |
77ebbefc | 76 | pub fn worker_is_active_local(upid: &UPID) -> bool { |
634132fe | 77 | if (upid.pid == *MY_PID) && (upid.pstart == *MY_PID_PSTART) { |
62ee2eb4 | 78 | WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id) |
634132fe | 79 | } else { |
62ee2eb4 | 80 | procfs::check_process_running_pstart(upid.pid, upid.pstart).is_some() |
479f6e40 DM |
81 | } |
82 | } | |
83 | ||
d607b886 DM |
84 | pub fn create_task_control_socket() -> Result<(), Error> { |
85 | ||
86 | let socketname = format!( | |
9b002cbc | 87 | "\0{}/proxmox-task-control-{}.sock", PROXMOX_BACKUP_VAR_RUN_DIR, *MY_PID); |
d607b886 | 88 | |
9b002cbc | 89 | let control_future = super::create_control_socket(socketname, |param| { |
d607b886 | 90 | let param = param.as_object() |
62ee2eb4 | 91 | .ok_or_else(|| format_err!("unable to parse parameters (expected json object)"))?; |
321070b4 | 92 | if param.keys().count() != 2 { bail!("wrong number of parameters"); } |
d607b886 | 93 | |
5751e495 | 94 | let command = param["command"].as_str() |
62ee2eb4 | 95 | .ok_or_else(|| format_err!("unable to parse parameters (missing command)"))?; |
d607b886 | 96 | |
5751e495 DM |
97 | // we have only two commands for now |
98 | if !(command == "abort-task" || command == "status") { bail!("got unknown command '{}'", command); } | |
d607b886 DM |
99 | |
100 | let upid_str = param["upid"].as_str() | |
62ee2eb4 | 101 | .ok_or_else(|| format_err!("unable to parse parameters (missing upid)"))?; |
d607b886 DM |
102 | |
103 | let upid = upid_str.parse::<UPID>()?; | |
104 | ||
105 | if !((upid.pid == *MY_PID) && (upid.pstart == *MY_PID_PSTART)) { | |
106 | bail!("upid does not belong to this process"); | |
107 | } | |
108 | ||
109 | let hash = WORKER_TASK_LIST.lock().unwrap(); | |
5751e495 DM |
110 | |
111 | match command { | |
112 | "abort-task" => { | |
113 | if let Some(ref worker) = hash.get(&upid.task_id) { | |
114 | worker.request_abort(); | |
115 | } else { | |
116 | // assume task is already stopped | |
117 | } | |
118 | Ok(Value::Null) | |
119 | } | |
120 | "status" => { | |
121 | let active = hash.contains_key(&upid.task_id); | |
122 | Ok(active.into()) | |
123 | } | |
124 | _ => { | |
125 | bail!("got unknown command '{}'", command); | |
126 | } | |
d607b886 | 127 | } |
d607b886 DM |
128 | })?; |
129 | ||
130 | tokio::spawn(control_future); | |
131 | ||
132 | Ok(()) | |
133 | } | |
134 | ||
321070b4 | 135 | pub fn abort_worker_async(upid: UPID) { |
75fef4b4 WB |
136 | tokio::spawn(async move { |
137 | if let Err(err) = abort_worker(upid).await { | |
321070b4 DM |
138 | eprintln!("abort worker failed - {}", err); |
139 | } | |
75fef4b4 | 140 | }); |
321070b4 DM |
141 | } |
142 | ||
5751e495 | 143 | pub async fn abort_worker(upid: UPID) -> Result<(), Error> { |
321070b4 DM |
144 | |
145 | let target_pid = upid.pid; | |
146 | ||
147 | let socketname = format!( | |
148 | "\0{}/proxmox-task-control-{}.sock", PROXMOX_BACKUP_VAR_RUN_DIR, target_pid); | |
149 | ||
150 | let cmd = json!({ | |
151 | "command": "abort-task", | |
152 | "upid": upid.to_string(), | |
153 | }); | |
154 | ||
5751e495 | 155 | super::send_command(socketname, cmd).map_ok(|_| ()).await |
321070b4 DM |
156 | } |
157 | ||
77bd2a46 | 158 | fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<TaskState>), Error> { |
4b01c983 DM |
159 | |
160 | let data = line.splitn(3, ' ').collect::<Vec<&str>>(); | |
161 | ||
162 | let len = data.len(); | |
163 | ||
164 | match len { | |
165 | 1 => Ok((data[0].to_owned(), data[0].parse::<UPID>()?, None)), | |
166 | 3 => { | |
167 | let endtime = i64::from_str_radix(data[1], 16)?; | |
77bd2a46 DC |
168 | let state = TaskState::from_endtime_and_message(endtime, data[2])?; |
169 | Ok((data[0].to_owned(), data[0].parse::<UPID>()?, Some(state))) | |
4b01c983 DM |
170 | } |
171 | _ => bail!("wrong number of components"), | |
172 | } | |
173 | } | |
174 | ||
35950380 | 175 | /// Create task log directory with correct permissions |
d607b886 | 176 | pub fn create_task_log_dirs() -> Result<(), Error> { |
35950380 DM |
177 | |
178 | try_block!({ | |
f74a03da | 179 | let backup_user = crate::backup::backup_user()?; |
35238e23 | 180 | let opts = CreateOptions::new() |
f74a03da DM |
181 | .owner(backup_user.uid) |
182 | .group(backup_user.gid); | |
35950380 | 183 | |
35238e23 WB |
184 | create_path(PROXMOX_BACKUP_LOG_DIR, None, Some(opts.clone()))?; |
185 | create_path(PROXMOX_BACKUP_TASK_DIR, None, Some(opts.clone()))?; | |
186 | create_path(PROXMOX_BACKUP_VAR_RUN_DIR, None, Some(opts))?; | |
35950380 DM |
187 | Ok(()) |
188 | }).map_err(|err: Error| format_err!("unable to create task log dir - {}", err))?; | |
189 | ||
190 | Ok(()) | |
191 | } | |
192 | ||
ae197dda DC |
193 | /// Read endtime (time of last log line) and exitstatus from task log file |
194 | /// If there is not a single line with at valid datetime, we assume the | |
195 | /// starttime to be the endtime | |
77bd2a46 | 196 | pub fn upid_read_status(upid: &UPID) -> Result<TaskState, Error> { |
56b66645 DM |
197 | |
198 | let mut status = TaskState::Unknown { endtime: upid.starttime }; | |
4b01c983 | 199 | |
4494d078 | 200 | let path = upid.log_path(); |
4b01c983 | 201 | |
0bfd87bc DM |
202 | let mut file = File::open(path)?; |
203 | ||
204 | /// speedup - only read tail | |
205 | use std::io::Seek; | |
206 | use std::io::SeekFrom; | |
207 | let _ = file.seek(SeekFrom::End(-8192)); // ignore errors | |
208 | ||
56b66645 DM |
209 | let mut data = Vec::with_capacity(8192); |
210 | file.read_to_end(&mut data)?; | |
4b01c983 | 211 | |
a4c11436 DC |
212 | // task logs should end with newline, we do not want it here |
213 | if data[data.len()-1] == b'\n' { | |
214 | data.pop(); | |
215 | } | |
216 | ||
56b66645 DM |
217 | let last_line = { |
218 | let mut start = 0; | |
a4c11436 | 219 | for pos in (0..data.len()).rev() { |
56b66645 DM |
220 | if data[pos] == b'\n' { |
221 | start = pos + 1; | |
222 | break; | |
223 | } | |
ae197dda | 224 | } |
56b66645 DM |
225 | &data[start..] |
226 | }; | |
227 | ||
228 | let last_line = std::str::from_utf8(last_line) | |
229 | .map_err(|err| format_err!("upid_read_status: utf8 parse failed: {}", err))?; | |
230 | ||
231 | let mut iter = last_line.splitn(2, ": "); | |
232 | if let Some(time_str) = iter.next() { | |
6a7be83e | 233 | if let Ok(endtime) = proxmox::tools::time::parse_rfc3339(time_str) { |
56b66645 | 234 | if let Some(rest) = iter.next().and_then(|rest| rest.strip_prefix("TASK ")) { |
77bd2a46 | 235 | if let Ok(state) = TaskState::from_endtime_and_message(endtime, rest) { |
4c116baf | 236 | status = state; |
4b01c983 DM |
237 | } |
238 | } | |
239 | } | |
240 | } | |
241 | ||
77bd2a46 | 242 | Ok(status) |
4b01c983 DM |
243 | } |
244 | ||
4c116baf | 245 | /// Task State |
77bd2a46 | 246 | #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] |
4c116baf DC |
247 | pub enum TaskState { |
248 | /// The Task ended with an undefined state | |
77bd2a46 | 249 | Unknown { endtime: i64 }, |
4c116baf | 250 | /// The Task ended and there were no errors or warnings |
77bd2a46 | 251 | OK { endtime: i64 }, |
4c116baf | 252 | /// The Task had 'count' amount of warnings and no errors |
77bd2a46 | 253 | Warning { count: u64, endtime: i64 }, |
4c116baf | 254 | /// The Task ended with the error described in 'message' |
77bd2a46 | 255 | Error { message: String, endtime: i64 }, |
4c116baf DC |
256 | } |
257 | ||
258 | impl TaskState { | |
77bd2a46 DC |
259 | pub fn endtime(&self) -> i64 { |
260 | match *self { | |
261 | TaskState::Unknown { endtime } => endtime, | |
262 | TaskState::OK { endtime } => endtime, | |
263 | TaskState::Warning { endtime, .. } => endtime, | |
264 | TaskState::Error { endtime, .. } => endtime, | |
4c116baf DC |
265 | } |
266 | } | |
4c116baf | 267 | |
77bd2a46 | 268 | fn result_text(&self) -> String { |
4c116baf | 269 | match self { |
77bd2a46 DC |
270 | TaskState::Error { message, .. } => format!("TASK ERROR: {}", message), |
271 | other => format!("TASK {}", other), | |
4c116baf DC |
272 | } |
273 | } | |
4c116baf | 274 | |
77bd2a46 | 275 | fn from_endtime_and_message(endtime: i64, s: &str) -> Result<Self, Error> { |
4c116baf | 276 | if s == "unknown" { |
77bd2a46 | 277 | Ok(TaskState::Unknown { endtime }) |
4c116baf | 278 | } else if s == "OK" { |
77bd2a46 | 279 | Ok(TaskState::OK { endtime }) |
4c116baf DC |
280 | } else if s.starts_with("WARNINGS: ") { |
281 | let count: u64 = s[10..].parse()?; | |
77bd2a46 | 282 | Ok(TaskState::Warning{ count, endtime }) |
4c116baf DC |
283 | } else if s.len() > 0 { |
284 | let message = if s.starts_with("ERROR: ") { &s[7..] } else { s }.to_string(); | |
77bd2a46 | 285 | Ok(TaskState::Error{ message, endtime }) |
4c116baf DC |
286 | } else { |
287 | bail!("unable to parse Task Status '{}'", s); | |
288 | } | |
289 | } | |
290 | } | |
291 | ||
77bd2a46 DC |
292 | impl std::cmp::PartialOrd for TaskState { |
293 | fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { | |
294 | Some(self.endtime().cmp(&other.endtime())) | |
295 | } | |
296 | } | |
297 | ||
298 | impl std::cmp::Ord for TaskState { | |
299 | fn cmp(&self, other: &Self) -> std::cmp::Ordering { | |
300 | self.endtime().cmp(&other.endtime()) | |
301 | } | |
302 | } | |
303 | ||
304 | impl std::fmt::Display for TaskState { | |
305 | fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | |
306 | match self { | |
307 | TaskState::Unknown { .. } => write!(f, "unknown"), | |
308 | TaskState::OK { .. }=> write!(f, "OK"), | |
309 | TaskState::Warning { count, .. } => write!(f, "WARNINGS: {}", count), | |
310 | TaskState::Error { message, .. } => write!(f, "{}", message), | |
311 | } | |
312 | } | |
313 | } | |
314 | ||
93aebb38 DM |
315 | /// Task details including parsed UPID |
316 | /// | |
317 | /// If there is no `state`, the task is still running. | |
318 | #[derive(Debug)] | |
319 | pub struct TaskListInfo { | |
320 | /// The parsed UPID | |
321 | pub upid: UPID, | |
322 | /// UPID string representation | |
323 | pub upid_str: String, | |
324 | /// Task `(endtime, status)` if already finished | |
77bd2a46 | 325 | pub state: Option<TaskState>, // endtime, status |
93aebb38 DM |
326 | } |
327 | ||
328 | // atomically read/update the task list, update status of finished tasks | |
329 | // new_upid is added to the list when specified. | |
330 | // Returns a sorted list of known tasks, | |
331 | fn update_active_workers(new_upid: Option<&UPID>) -> Result<Vec<TaskListInfo>, Error> { | |
4b01c983 | 332 | |
f74a03da | 333 | let backup_user = crate::backup::backup_user()?; |
35950380 | 334 | |
98c259b4 | 335 | let lock = open_file_locked(PROXMOX_BACKUP_TASK_LOCK_FN, std::time::Duration::new(10, 0))?; |
f74a03da | 336 | nix::unistd::chown(PROXMOX_BACKUP_TASK_LOCK_FN, Some(backup_user.uid), Some(backup_user.gid))?; |
4b01c983 | 337 | |
634132fe | 338 | let reader = match File::open(PROXMOX_BACKUP_ACTIVE_TASK_FN) { |
4b01c983 DM |
339 | Ok(f) => Some(BufReader::new(f)), |
340 | Err(err) => { | |
341 | if err.kind() == std::io::ErrorKind::NotFound { | |
342 | None | |
343 | } else { | |
634132fe | 344 | bail!("unable to open active worker {:?} - {}", PROXMOX_BACKUP_ACTIVE_TASK_FN, err); |
4b01c983 DM |
345 | } |
346 | } | |
347 | }; | |
348 | ||
4b01c983 DM |
349 | let mut active_list = vec![]; |
350 | let mut finish_list = vec![]; | |
351 | ||
352 | if let Some(lines) = reader.map(|r| r.lines()) { | |
353 | ||
354 | for line in lines { | |
355 | let line = line?; | |
356 | match parse_worker_status_line(&line) { | |
357 | Err(err) => bail!("unable to parse active worker status '{}' - {}", line, err), | |
fae11693 DC |
358 | Ok((upid_str, upid, state)) => match state { |
359 | None if worker_is_active_local(&upid) => { | |
4b01c983 | 360 | active_list.push(TaskListInfo { upid, upid_str, state: None }); |
fae11693 DC |
361 | }, |
362 | None => { | |
363 | println!("Detected stopped UPID {}", upid_str); | |
6a7be83e | 364 | let now = proxmox::tools::time::epoch_i64(); |
77bd2a46 | 365 | let status = upid_read_status(&upid) |
6a7be83e | 366 | .unwrap_or_else(|_| TaskState::Unknown { endtime: now }); |
fae11693 | 367 | finish_list.push(TaskListInfo { |
77bd2a46 | 368 | upid, upid_str, state: Some(status) |
fae11693 DC |
369 | }); |
370 | }, | |
77bd2a46 | 371 | Some(status) => { |
fae11693 | 372 | finish_list.push(TaskListInfo { |
77bd2a46 | 373 | upid, upid_str, state: Some(status) |
fae11693 | 374 | }) |
4b01c983 DM |
375 | } |
376 | } | |
377 | } | |
378 | } | |
379 | } | |
380 | ||
381 | if let Some(upid) = new_upid { | |
382 | active_list.push(TaskListInfo { upid: upid.clone(), upid_str: upid.to_string(), state: None }); | |
383 | } | |
384 | ||
385 | // assemble list without duplicates | |
386 | // we include all active tasks, | |
387 | // and fill up to 1000 entries with finished tasks | |
388 | ||
389 | let max = 1000; | |
390 | ||
391 | let mut task_hash = HashMap::new(); | |
392 | ||
393 | for info in active_list { | |
394 | task_hash.insert(info.upid_str.clone(), info); | |
395 | } | |
396 | ||
397 | for info in finish_list { | |
398 | if task_hash.len() > max { break; } | |
399 | if !task_hash.contains_key(&info.upid_str) { | |
400 | task_hash.insert(info.upid_str.clone(), info); | |
401 | } | |
402 | } | |
403 | ||
93aebb38 DM |
404 | let mut task_list: Vec<TaskListInfo> = vec![]; |
405 | for (_, info) in task_hash { task_list.push(info); } | |
406 | ||
ba70040d | 407 | task_list.sort_unstable_by(|b, a| { // lastest on top |
4b01c983 | 408 | match (&a.state, &b.state) { |
77bd2a46 | 409 | (Some(s1), Some(s2)) => s1.cmp(&s2), |
4b01c983 DM |
410 | (Some(_), None) => std::cmp::Ordering::Less, |
411 | (None, Some(_)) => std::cmp::Ordering::Greater, | |
412 | _ => a.upid.starttime.cmp(&b.upid.starttime), | |
413 | } | |
414 | }); | |
415 | ||
416 | let mut raw = String::new(); | |
417 | for info in &task_list { | |
77bd2a46 DC |
418 | if let Some(status) = &info.state { |
419 | raw.push_str(&format!("{} {:08X} {}\n", info.upid_str, status.endtime(), status)); | |
4b01c983 DM |
420 | } else { |
421 | raw.push_str(&info.upid_str); | |
422 | raw.push('\n'); | |
423 | } | |
424 | } | |
425 | ||
feaa1ad3 WB |
426 | replace_file( |
427 | PROXMOX_BACKUP_ACTIVE_TASK_FN, | |
428 | raw.as_bytes(), | |
429 | CreateOptions::new() | |
f74a03da DM |
430 | .owner(backup_user.uid) |
431 | .group(backup_user.gid), | |
feaa1ad3 | 432 | )?; |
4b01c983 DM |
433 | |
434 | drop(lock); | |
435 | ||
93aebb38 | 436 | Ok(task_list) |
4b01c983 DM |
437 | } |
438 | ||
93aebb38 DM |
439 | /// Returns a sorted list of known tasks |
440 | /// | |
441 | /// The list is sorted by `(starttime, endtime)` in ascending order | |
442 | pub fn read_task_list() -> Result<Vec<TaskListInfo>, Error> { | |
443 | update_active_workers(None) | |
444 | } | |
4b01c983 | 445 | |
882594c5 DM |
446 | /// Launch long running worker tasks. |
447 | /// | |
448 | /// A worker task can either be a whole thread, or a simply tokio | |
449 | /// task/future. Each task can `log()` messages, which are stored | |
450 | /// persistently to files. Task should poll the `abort_requested` | |
451 | /// flag, and stop execution when requested. | |
479f6e40 DM |
452 | #[derive(Debug)] |
453 | pub struct WorkerTask { | |
454 | upid: UPID, | |
455 | data: Mutex<WorkerTaskData>, | |
456 | abort_requested: AtomicBool, | |
457 | } | |
458 | ||
459 | impl std::fmt::Display for WorkerTask { | |
460 | ||
461 | fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { | |
462 | self.upid.fmt(f) | |
463 | } | |
464 | } | |
465 | ||
466 | #[derive(Debug)] | |
467 | struct WorkerTaskData { | |
468 | logger: FileLogger, | |
469 | progress: f64, // 0..1 | |
f6de2c73 | 470 | warn_count: u64, |
75bc49be | 471 | pub abort_listeners: Vec<oneshot::Sender<()>>, |
479f6e40 DM |
472 | } |
473 | ||
474 | impl Drop for WorkerTask { | |
475 | ||
476 | fn drop(&mut self) { | |
477 | println!("unregister worker"); | |
478 | } | |
479 | } | |
480 | ||
481 | impl WorkerTask { | |
482 | ||
e7cb4dc5 | 483 | pub fn new(worker_type: &str, worker_id: Option<String>, userid: Userid, to_stdout: bool) -> Result<Arc<Self>, Error> { |
479f6e40 DM |
484 | println!("register worker"); |
485 | ||
e7cb4dc5 | 486 | let upid = UPID::new(worker_type, worker_id, userid)?; |
634132fe | 487 | let task_id = upid.task_id; |
479f6e40 | 488 | |
634132fe | 489 | let mut path = std::path::PathBuf::from(PROXMOX_BACKUP_TASK_DIR); |
35950380 | 490 | |
479f6e40 DM |
491 | path.push(format!("{:02X}", upid.pstart % 256)); |
492 | ||
f74a03da | 493 | let backup_user = crate::backup::backup_user()?; |
35950380 | 494 | |
f74a03da | 495 | create_path(&path, None, Some(CreateOptions::new().owner(backup_user.uid).group(backup_user.gid)))?; |
479f6e40 DM |
496 | |
497 | path.push(upid.to_string()); | |
498 | ||
499 | println!("FILE: {:?}", path); | |
500 | ||
35950380 | 501 | let logger = FileLogger::new(&path, to_stdout)?; |
f74a03da | 502 | nix::unistd::chown(&path, Some(backup_user.uid), Some(backup_user.gid))?; |
479f6e40 DM |
503 | |
504 | let worker = Arc::new(Self { | |
05d755b2 | 505 | upid: upid.clone(), |
479f6e40 DM |
506 | abort_requested: AtomicBool::new(false), |
507 | data: Mutex::new(WorkerTaskData { | |
508 | logger, | |
509 | progress: 0.0, | |
f6de2c73 | 510 | warn_count: 0, |
75bc49be | 511 | abort_listeners: vec![], |
479f6e40 DM |
512 | }), |
513 | }); | |
514 | ||
05d755b2 DC |
515 | // scope to drop the lock again after inserting |
516 | { | |
517 | let mut hash = WORKER_TASK_LIST.lock().unwrap(); | |
518 | hash.insert(task_id, worker.clone()); | |
519 | super::set_worker_count(hash.len()); | |
520 | } | |
7a630df7 | 521 | |
05d755b2 | 522 | update_active_workers(Some(&upid))?; |
479f6e40 DM |
523 | |
524 | Ok(worker) | |
525 | } | |
526 | ||
882594c5 | 527 | /// Spawn a new tokio task/future. |
660c6846 DM |
528 | pub fn spawn<F, T>( |
529 | worker_type: &str, | |
530 | worker_id: Option<String>, | |
e7cb4dc5 | 531 | userid: Userid, |
660c6846 DM |
532 | to_stdout: bool, |
533 | f: F, | |
534 | ) -> Result<String, Error> | |
479f6e40 | 535 | where F: Send + 'static + FnOnce(Arc<WorkerTask>) -> T, |
75fef4b4 | 536 | T: Send + 'static + Future<Output = Result<(), Error>>, |
479f6e40 | 537 | { |
e7cb4dc5 | 538 | let worker = WorkerTask::new(worker_type, worker_id, userid, to_stdout)?; |
660c6846 | 539 | let upid_str = worker.upid.to_string(); |
75fef4b4 WB |
540 | let f = f(worker.clone()); |
541 | tokio::spawn(async move { | |
542 | let result = f.await; | |
dd8e744f | 543 | worker.log_result(&result); |
75fef4b4 | 544 | }); |
479f6e40 | 545 | |
660c6846 | 546 | Ok(upid_str) |
479f6e40 DM |
547 | } |
548 | ||
882594c5 | 549 | /// Create a new worker thread. |
660c6846 DM |
550 | pub fn new_thread<F>( |
551 | worker_type: &str, | |
552 | worker_id: Option<String>, | |
e7cb4dc5 | 553 | userid: Userid, |
660c6846 DM |
554 | to_stdout: bool, |
555 | f: F, | |
556 | ) -> Result<String, Error> | |
d3f4c08f | 557 | where F: Send + UnwindSafe + 'static + FnOnce(Arc<WorkerTask>) -> Result<(), Error> |
479f6e40 DM |
558 | { |
559 | println!("register worker thread"); | |
560 | ||
e7cb4dc5 | 561 | let worker = WorkerTask::new(worker_type, worker_id, userid, to_stdout)?; |
660c6846 | 562 | let upid_str = worker.upid.to_string(); |
479f6e40 | 563 | |
217170e1 | 564 | let _child = std::thread::Builder::new().name(upid_str.clone()).spawn(move || { |
d3f4c08f DM |
565 | let worker1 = worker.clone(); |
566 | let result = match std::panic::catch_unwind(move || f(worker1)) { | |
567 | Ok(r) => r, | |
568 | Err(panic) => { | |
569 | match panic.downcast::<&str>() { | |
570 | Ok(panic_msg) => { | |
571 | Err(format_err!("worker panicked: {}", panic_msg)) | |
572 | } | |
573 | Err(_) => { | |
574 | Err(format_err!("worker panicked: unknown type.")) | |
575 | } | |
576 | } | |
577 | } | |
578 | }; | |
579 | ||
dd8e744f | 580 | worker.log_result(&result); |
479f6e40 DM |
581 | }); |
582 | ||
660c6846 | 583 | Ok(upid_str) |
479f6e40 DM |
584 | } |
585 | ||
4c116baf DC |
586 | /// create state from self and a result |
587 | pub fn create_state(&self, result: &Result<(), Error>) -> TaskState { | |
f6de2c73 | 588 | let warn_count = self.data.lock().unwrap().warn_count; |
cef03f41 | 589 | |
6a7be83e | 590 | let endtime = proxmox::tools::time::epoch_i64(); |
77bd2a46 | 591 | |
4b01c983 | 592 | if let Err(err) = result { |
77bd2a46 | 593 | TaskState::Error { message: err.to_string(), endtime } |
f6de2c73 | 594 | } else if warn_count > 0 { |
77bd2a46 | 595 | TaskState::Warning { count: warn_count, endtime } |
4b01c983 | 596 | } else { |
77bd2a46 | 597 | TaskState::OK { endtime } |
4b01c983 | 598 | } |
cef03f41 DC |
599 | } |
600 | ||
601 | /// Log task result, remove task from running list | |
602 | pub fn log_result(&self, result: &Result<(), Error>) { | |
4c116baf DC |
603 | let state = self.create_state(result); |
604 | self.log(state.result_text()); | |
418def7a DM |
605 | |
606 | WORKER_TASK_LIST.lock().unwrap().remove(&self.upid.task_id); | |
607 | let _ = update_active_workers(None); | |
608 | super::set_worker_count(WORKER_TASK_LIST.lock().unwrap().len()); | |
4b01c983 DM |
609 | } |
610 | ||
882594c5 | 611 | /// Log a message. |
479f6e40 DM |
612 | pub fn log<S: AsRef<str>>(&self, msg: S) { |
613 | let mut data = self.data.lock().unwrap(); | |
614 | data.logger.log(msg); | |
615 | } | |
616 | ||
f6de2c73 DC |
617 | /// Log a message as warning. |
618 | pub fn warn<S: AsRef<str>>(&self, msg: S) { | |
619 | let mut data = self.data.lock().unwrap(); | |
620 | data.logger.log(format!("WARN: {}", msg.as_ref())); | |
621 | data.warn_count += 1; | |
622 | } | |
623 | ||
882594c5 | 624 | /// Set progress indicator |
479f6e40 DM |
625 | pub fn progress(&self, progress: f64) { |
626 | if progress >= 0.0 && progress <= 1.0 { | |
627 | let mut data = self.data.lock().unwrap(); | |
628 | data.progress = progress; | |
629 | } else { | |
630 | // fixme: log!("task '{}': ignoring strange value for progress '{}'", self.upid, progress); | |
631 | } | |
632 | } | |
633 | ||
882594c5 | 634 | /// Request abort |
d607b886 | 635 | pub fn request_abort(&self) { |
98a181f0 | 636 | eprintln!("set abort flag for worker {}", self.upid); |
479f6e40 | 637 | self.abort_requested.store(true, Ordering::SeqCst); |
75bc49be DM |
638 | // noitify listeners |
639 | let mut data = self.data.lock().unwrap(); | |
640 | loop { | |
641 | match data.abort_listeners.pop() { | |
642 | None => { break; }, | |
643 | Some(ch) => { | |
644 | let _ = ch.send(()); // ignore erros here | |
645 | }, | |
646 | } | |
647 | } | |
479f6e40 DM |
648 | } |
649 | ||
882594c5 | 650 | /// Test if abort was requested. |
479f6e40 DM |
651 | pub fn abort_requested(&self) -> bool { |
652 | self.abort_requested.load(Ordering::SeqCst) | |
653 | } | |
654 | ||
882594c5 | 655 | /// Fail if abort was requested. |
479f6e40 DM |
656 | pub fn fail_on_abort(&self) -> Result<(), Error> { |
657 | if self.abort_requested() { | |
99641a6b | 658 | bail!("abort requested - aborting task"); |
479f6e40 DM |
659 | } |
660 | Ok(()) | |
661 | } | |
75bc49be DM |
662 | |
663 | /// Get a future which resolves on task abort | |
664 | pub fn abort_future(&self) -> oneshot::Receiver<()> { | |
665 | let (tx, rx) = oneshot::channel::<()>(); | |
666 | ||
667 | let mut data = self.data.lock().unwrap(); | |
668 | if self.abort_requested() { | |
669 | let _ = tx.send(()); | |
670 | } else { | |
671 | data.abort_listeners.push(tx); | |
672 | } | |
673 | rx | |
674 | } | |
4bd2a9e4 DC |
675 | |
676 | pub fn upid(&self) -> &UPID { | |
677 | &self.upid | |
678 | } | |
479f6e40 | 679 | } |