]>
Commit | Line | Data |
---|---|---|
e7244387 | 1 | use std::collections::{HashMap, VecDeque}; |
4b01c983 | 2 | use std::fs::File; |
9a760917 | 3 | use std::path::Path; |
5ade6c25 | 4 | use std::io::{Read, Write, BufRead, BufReader}; |
d3f4c08f | 5 | use std::panic::UnwindSafe; |
18c0df4c WB |
6 | use std::sync::atomic::{AtomicBool, Ordering}; |
7 | use std::sync::{Arc, Mutex}; | |
d3f4c08f | 8 | |
f7d4e4b5 | 9 | use anyhow::{bail, format_err, Error}; |
18c0df4c WB |
10 | use futures::*; |
11 | use lazy_static::lazy_static; | |
619495b2 | 12 | use nix::unistd::Pid; |
321070b4 | 13 | use serde_json::{json, Value}; |
4c116baf | 14 | use serde::{Serialize, Deserialize}; |
18c0df4c | 15 | use tokio::sync::oneshot; |
479f6e40 | 16 | |
619495b2 | 17 | use proxmox::sys::linux::procfs; |
9ea4bce4 | 18 | use proxmox::try_block; |
98c259b4 | 19 | use proxmox::tools::fs::{create_path, open_file_locked, replace_file, CreateOptions}; |
e18a6c9e | 20 | |
634132fe DM |
21 | use super::UPID; |
22 | ||
e7244387 | 23 | use crate::tools::logrotate::{LogRotate, LogRotateFiles}; |
c0df91f8 | 24 | use crate::tools::{FileLogger, FileLogOptions}; |
e7cb4dc5 | 25 | use crate::api2::types::Userid; |
479f6e40 | 26 | |
2ec979e4 | 27 | macro_rules! PROXMOX_BACKUP_VAR_RUN_DIR_M { () => ("/run/proxmox-backup") } |
634132fe DM |
28 | macro_rules! PROXMOX_BACKUP_LOG_DIR_M { () => ("/var/log/proxmox-backup") } |
29 | macro_rules! PROXMOX_BACKUP_TASK_DIR_M { () => (concat!( PROXMOX_BACKUP_LOG_DIR_M!(), "/tasks")) } | |
30 | ||
d607b886 | 31 | pub const PROXMOX_BACKUP_VAR_RUN_DIR: &str = PROXMOX_BACKUP_VAR_RUN_DIR_M!(); |
634132fe DM |
32 | pub const PROXMOX_BACKUP_LOG_DIR: &str = PROXMOX_BACKUP_LOG_DIR_M!(); |
33 | pub const PROXMOX_BACKUP_TASK_DIR: &str = PROXMOX_BACKUP_TASK_DIR_M!(); | |
34 | pub const PROXMOX_BACKUP_TASK_LOCK_FN: &str = concat!(PROXMOX_BACKUP_TASK_DIR_M!(), "/.active.lock"); | |
35 | pub const PROXMOX_BACKUP_ACTIVE_TASK_FN: &str = concat!(PROXMOX_BACKUP_TASK_DIR_M!(), "/active"); | |
784fa1c2 | 36 | pub const PROXMOX_BACKUP_INDEX_TASK_FN: &str = concat!(PROXMOX_BACKUP_TASK_DIR_M!(), "/index"); |
5ade6c25 | 37 | pub const PROXMOX_BACKUP_ARCHIVE_TASK_FN: &str = concat!(PROXMOX_BACKUP_TASK_DIR_M!(), "/archive"); |
784fa1c2 DC |
38 | |
39 | const MAX_INDEX_TASKS: usize = 1000; | |
479f6e40 DM |
40 | |
41 | lazy_static! { | |
42 | static ref WORKER_TASK_LIST: Mutex<HashMap<usize, Arc<WorkerTask>>> = Mutex::new(HashMap::new()); | |
d607b886 DM |
43 | |
44 | static ref MY_PID: i32 = unsafe { libc::getpid() }; | |
6a0dc4a5 | 45 | static ref MY_PID_PSTART: u64 = procfs::PidStat::read_from_pid(Pid::from_raw(*MY_PID)) |
619495b2 WB |
46 | .unwrap() |
47 | .starttime; | |
479f6e40 DM |
48 | } |
49 | ||
634132fe | 50 | /// Test if the task is still running |
5751e495 DM |
51 | pub async fn worker_is_active(upid: &UPID) -> Result<bool, Error> { |
52 | if (upid.pid == *MY_PID) && (upid.pstart == *MY_PID_PSTART) { | |
53 | return Ok(WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id)); | |
54 | } | |
55 | ||
56 | if !procfs::check_process_running_pstart(upid.pid, upid.pstart).is_some() { | |
57 | return Ok(false); | |
58 | } | |
59 | ||
60 | let socketname = format!( | |
61 | "\0{}/proxmox-task-control-{}.sock", PROXMOX_BACKUP_VAR_RUN_DIR, upid.pid); | |
62 | ||
63 | let cmd = json!({ | |
64 | "command": "status", | |
65 | "upid": upid.to_string(), | |
66 | }); | |
67 | ||
68 | let status = super::send_command(socketname, cmd).await?; | |
4494d078 | 69 | |
5751e495 DM |
70 | if let Some(active) = status.as_bool() { |
71 | Ok(active) | |
72 | } else { | |
73 | bail!("got unexpected result {:?} (expected bool)", status); | |
74 | } | |
75 | } | |
76 | ||
77 | /// Test if the task is still running (fast but inaccurate implementation) | |
78 | /// | |
79 | /// If the task is spanned from a different process, we simply return if | |
80 | /// that process is still running. This information is good enough to detect | |
81 | /// stale tasks... | |
77ebbefc | 82 | pub fn worker_is_active_local(upid: &UPID) -> bool { |
634132fe | 83 | if (upid.pid == *MY_PID) && (upid.pstart == *MY_PID_PSTART) { |
62ee2eb4 | 84 | WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id) |
634132fe | 85 | } else { |
62ee2eb4 | 86 | procfs::check_process_running_pstart(upid.pid, upid.pstart).is_some() |
479f6e40 DM |
87 | } |
88 | } | |
89 | ||
d607b886 DM |
90 | pub fn create_task_control_socket() -> Result<(), Error> { |
91 | ||
92 | let socketname = format!( | |
9b002cbc | 93 | "\0{}/proxmox-task-control-{}.sock", PROXMOX_BACKUP_VAR_RUN_DIR, *MY_PID); |
d607b886 | 94 | |
9b002cbc | 95 | let control_future = super::create_control_socket(socketname, |param| { |
e4f5f59e TL |
96 | let param = param |
97 | .as_object() | |
62ee2eb4 | 98 | .ok_or_else(|| format_err!("unable to parse parameters (expected json object)"))?; |
321070b4 | 99 | if param.keys().count() != 2 { bail!("wrong number of parameters"); } |
d607b886 | 100 | |
e4f5f59e TL |
101 | let command = param["command"] |
102 | .as_str() | |
62ee2eb4 | 103 | .ok_or_else(|| format_err!("unable to parse parameters (missing command)"))?; |
d607b886 | 104 | |
5751e495 | 105 | // we have only two commands for now |
e4f5f59e TL |
106 | if !(command == "abort-task" || command == "status") { |
107 | bail!("got unknown command '{}'", command); | |
108 | } | |
d607b886 | 109 | |
e4f5f59e TL |
110 | let upid_str = param["upid"] |
111 | .as_str() | |
62ee2eb4 | 112 | .ok_or_else(|| format_err!("unable to parse parameters (missing upid)"))?; |
d607b886 DM |
113 | |
114 | let upid = upid_str.parse::<UPID>()?; | |
115 | ||
e4f5f59e | 116 | if !(upid.pid == *MY_PID && upid.pstart == *MY_PID_PSTART) { |
d607b886 DM |
117 | bail!("upid does not belong to this process"); |
118 | } | |
119 | ||
120 | let hash = WORKER_TASK_LIST.lock().unwrap(); | |
5751e495 DM |
121 | |
122 | match command { | |
123 | "abort-task" => { | |
124 | if let Some(ref worker) = hash.get(&upid.task_id) { | |
125 | worker.request_abort(); | |
126 | } else { | |
127 | // assume task is already stopped | |
128 | } | |
129 | Ok(Value::Null) | |
130 | } | |
131 | "status" => { | |
132 | let active = hash.contains_key(&upid.task_id); | |
133 | Ok(active.into()) | |
134 | } | |
135 | _ => { | |
136 | bail!("got unknown command '{}'", command); | |
137 | } | |
d607b886 | 138 | } |
d607b886 DM |
139 | })?; |
140 | ||
141 | tokio::spawn(control_future); | |
142 | ||
143 | Ok(()) | |
144 | } | |
145 | ||
321070b4 | 146 | pub fn abort_worker_async(upid: UPID) { |
75fef4b4 WB |
147 | tokio::spawn(async move { |
148 | if let Err(err) = abort_worker(upid).await { | |
321070b4 DM |
149 | eprintln!("abort worker failed - {}", err); |
150 | } | |
75fef4b4 | 151 | }); |
321070b4 DM |
152 | } |
153 | ||
5751e495 | 154 | pub async fn abort_worker(upid: UPID) -> Result<(), Error> { |
321070b4 DM |
155 | |
156 | let target_pid = upid.pid; | |
157 | ||
158 | let socketname = format!( | |
159 | "\0{}/proxmox-task-control-{}.sock", PROXMOX_BACKUP_VAR_RUN_DIR, target_pid); | |
160 | ||
161 | let cmd = json!({ | |
162 | "command": "abort-task", | |
163 | "upid": upid.to_string(), | |
164 | }); | |
165 | ||
5751e495 | 166 | super::send_command(socketname, cmd).map_ok(|_| ()).await |
321070b4 DM |
167 | } |
168 | ||
77bd2a46 | 169 | fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<TaskState>), Error> { |
4b01c983 DM |
170 | |
171 | let data = line.splitn(3, ' ').collect::<Vec<&str>>(); | |
172 | ||
173 | let len = data.len(); | |
174 | ||
175 | match len { | |
176 | 1 => Ok((data[0].to_owned(), data[0].parse::<UPID>()?, None)), | |
177 | 3 => { | |
178 | let endtime = i64::from_str_radix(data[1], 16)?; | |
77bd2a46 DC |
179 | let state = TaskState::from_endtime_and_message(endtime, data[2])?; |
180 | Ok((data[0].to_owned(), data[0].parse::<UPID>()?, Some(state))) | |
4b01c983 DM |
181 | } |
182 | _ => bail!("wrong number of components"), | |
183 | } | |
184 | } | |
185 | ||
35950380 | 186 | /// Create task log directory with correct permissions |
d607b886 | 187 | pub fn create_task_log_dirs() -> Result<(), Error> { |
35950380 DM |
188 | |
189 | try_block!({ | |
f74a03da | 190 | let backup_user = crate::backup::backup_user()?; |
35238e23 | 191 | let opts = CreateOptions::new() |
f74a03da DM |
192 | .owner(backup_user.uid) |
193 | .group(backup_user.gid); | |
35950380 | 194 | |
35238e23 WB |
195 | create_path(PROXMOX_BACKUP_LOG_DIR, None, Some(opts.clone()))?; |
196 | create_path(PROXMOX_BACKUP_TASK_DIR, None, Some(opts.clone()))?; | |
197 | create_path(PROXMOX_BACKUP_VAR_RUN_DIR, None, Some(opts))?; | |
35950380 DM |
198 | Ok(()) |
199 | }).map_err(|err: Error| format_err!("unable to create task log dir - {}", err))?; | |
200 | ||
201 | Ok(()) | |
202 | } | |
203 | ||
ae197dda DC |
204 | /// Read endtime (time of last log line) and exitstatus from task log file |
205 | /// If there is not a single line with at valid datetime, we assume the | |
206 | /// starttime to be the endtime | |
77bd2a46 | 207 | pub fn upid_read_status(upid: &UPID) -> Result<TaskState, Error> { |
56b66645 DM |
208 | |
209 | let mut status = TaskState::Unknown { endtime: upid.starttime }; | |
4b01c983 | 210 | |
4494d078 | 211 | let path = upid.log_path(); |
4b01c983 | 212 | |
0bfd87bc DM |
213 | let mut file = File::open(path)?; |
214 | ||
215 | /// speedup - only read tail | |
216 | use std::io::Seek; | |
217 | use std::io::SeekFrom; | |
218 | let _ = file.seek(SeekFrom::End(-8192)); // ignore errors | |
219 | ||
56b66645 DM |
220 | let mut data = Vec::with_capacity(8192); |
221 | file.read_to_end(&mut data)?; | |
4b01c983 | 222 | |
a4c11436 | 223 | // task logs should end with newline, we do not want it here |
5e39918f | 224 | if data.len() > 0 && data[data.len()-1] == b'\n' { |
a4c11436 DC |
225 | data.pop(); |
226 | } | |
227 | ||
56b66645 DM |
228 | let last_line = { |
229 | let mut start = 0; | |
a4c11436 | 230 | for pos in (0..data.len()).rev() { |
56b66645 | 231 | if data[pos] == b'\n' { |
5e39918f | 232 | start = data.len().min(pos + 1); |
56b66645 DM |
233 | break; |
234 | } | |
ae197dda | 235 | } |
56b66645 DM |
236 | &data[start..] |
237 | }; | |
238 | ||
239 | let last_line = std::str::from_utf8(last_line) | |
240 | .map_err(|err| format_err!("upid_read_status: utf8 parse failed: {}", err))?; | |
241 | ||
242 | let mut iter = last_line.splitn(2, ": "); | |
243 | if let Some(time_str) = iter.next() { | |
6a7be83e | 244 | if let Ok(endtime) = proxmox::tools::time::parse_rfc3339(time_str) { |
56b66645 | 245 | if let Some(rest) = iter.next().and_then(|rest| rest.strip_prefix("TASK ")) { |
77bd2a46 | 246 | if let Ok(state) = TaskState::from_endtime_and_message(endtime, rest) { |
4c116baf | 247 | status = state; |
4b01c983 DM |
248 | } |
249 | } | |
250 | } | |
251 | } | |
252 | ||
77bd2a46 | 253 | Ok(status) |
4b01c983 DM |
254 | } |
255 | ||
4c116baf | 256 | /// Task State |
77bd2a46 | 257 | #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] |
4c116baf DC |
258 | pub enum TaskState { |
259 | /// The Task ended with an undefined state | |
77bd2a46 | 260 | Unknown { endtime: i64 }, |
4c116baf | 261 | /// The Task ended and there were no errors or warnings |
77bd2a46 | 262 | OK { endtime: i64 }, |
4c116baf | 263 | /// The Task had 'count' amount of warnings and no errors |
77bd2a46 | 264 | Warning { count: u64, endtime: i64 }, |
4c116baf | 265 | /// The Task ended with the error described in 'message' |
77bd2a46 | 266 | Error { message: String, endtime: i64 }, |
4c116baf DC |
267 | } |
268 | ||
269 | impl TaskState { | |
77bd2a46 DC |
270 | pub fn endtime(&self) -> i64 { |
271 | match *self { | |
272 | TaskState::Unknown { endtime } => endtime, | |
273 | TaskState::OK { endtime } => endtime, | |
274 | TaskState::Warning { endtime, .. } => endtime, | |
275 | TaskState::Error { endtime, .. } => endtime, | |
4c116baf DC |
276 | } |
277 | } | |
4c116baf | 278 | |
77bd2a46 | 279 | fn result_text(&self) -> String { |
4c116baf | 280 | match self { |
77bd2a46 DC |
281 | TaskState::Error { message, .. } => format!("TASK ERROR: {}", message), |
282 | other => format!("TASK {}", other), | |
4c116baf DC |
283 | } |
284 | } | |
4c116baf | 285 | |
77bd2a46 | 286 | fn from_endtime_and_message(endtime: i64, s: &str) -> Result<Self, Error> { |
4c116baf | 287 | if s == "unknown" { |
77bd2a46 | 288 | Ok(TaskState::Unknown { endtime }) |
4c116baf | 289 | } else if s == "OK" { |
77bd2a46 | 290 | Ok(TaskState::OK { endtime }) |
4c116baf DC |
291 | } else if s.starts_with("WARNINGS: ") { |
292 | let count: u64 = s[10..].parse()?; | |
77bd2a46 | 293 | Ok(TaskState::Warning{ count, endtime }) |
4c116baf DC |
294 | } else if s.len() > 0 { |
295 | let message = if s.starts_with("ERROR: ") { &s[7..] } else { s }.to_string(); | |
77bd2a46 | 296 | Ok(TaskState::Error{ message, endtime }) |
4c116baf DC |
297 | } else { |
298 | bail!("unable to parse Task Status '{}'", s); | |
299 | } | |
300 | } | |
301 | } | |
302 | ||
77bd2a46 DC |
303 | impl std::cmp::PartialOrd for TaskState { |
304 | fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { | |
305 | Some(self.endtime().cmp(&other.endtime())) | |
306 | } | |
307 | } | |
308 | ||
309 | impl std::cmp::Ord for TaskState { | |
310 | fn cmp(&self, other: &Self) -> std::cmp::Ordering { | |
311 | self.endtime().cmp(&other.endtime()) | |
312 | } | |
313 | } | |
314 | ||
315 | impl std::fmt::Display for TaskState { | |
316 | fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | |
317 | match self { | |
318 | TaskState::Unknown { .. } => write!(f, "unknown"), | |
319 | TaskState::OK { .. }=> write!(f, "OK"), | |
320 | TaskState::Warning { count, .. } => write!(f, "WARNINGS: {}", count), | |
321 | TaskState::Error { message, .. } => write!(f, "{}", message), | |
322 | } | |
323 | } | |
324 | } | |
325 | ||
93aebb38 DM |
326 | /// Task details including parsed UPID |
327 | /// | |
328 | /// If there is no `state`, the task is still running. | |
329 | #[derive(Debug)] | |
330 | pub struct TaskListInfo { | |
331 | /// The parsed UPID | |
332 | pub upid: UPID, | |
333 | /// UPID string representation | |
334 | pub upid_str: String, | |
335 | /// Task `(endtime, status)` if already finished | |
77bd2a46 | 336 | pub state: Option<TaskState>, // endtime, status |
93aebb38 DM |
337 | } |
338 | ||
66f4e6a8 DC |
339 | fn lock_task_list_files(exclusive: bool) -> Result<std::fs::File, Error> { |
340 | let backup_user = crate::backup::backup_user()?; | |
341 | ||
342 | let lock = open_file_locked(PROXMOX_BACKUP_TASK_LOCK_FN, std::time::Duration::new(10, 0), exclusive)?; | |
343 | nix::unistd::chown(PROXMOX_BACKUP_TASK_LOCK_FN, Some(backup_user.uid), Some(backup_user.gid))?; | |
344 | ||
345 | Ok(lock) | |
346 | } | |
347 | ||
9a760917 DC |
348 | /// checks if the Task Archive is bigger that 'size_threshold' bytes, and |
349 | /// rotates it if it is | |
350 | pub fn rotate_task_log_archive(size_threshold: u64, compress: bool, max_files: Option<usize>) -> Result<bool, Error> { | |
351 | let _lock = lock_task_list_files(true)?; | |
352 | let path = Path::new(PROXMOX_BACKUP_ARCHIVE_TASK_FN); | |
2d81f7b0 DC |
353 | let metadata = match path.metadata() { |
354 | Ok(metadata) => metadata, | |
355 | Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(false), | |
356 | Err(err) => bail!("unable to open task archive - {}", err), | |
357 | }; | |
358 | ||
9a760917 DC |
359 | if metadata.len() > size_threshold { |
360 | let mut logrotate = LogRotate::new(PROXMOX_BACKUP_ARCHIVE_TASK_FN, compress).ok_or_else(|| format_err!("could not get archive file names"))?; | |
361 | let backup_user = crate::backup::backup_user()?; | |
362 | logrotate.rotate( | |
363 | CreateOptions::new() | |
364 | .owner(backup_user.uid) | |
365 | .group(backup_user.gid), | |
366 | max_files, | |
367 | )?; | |
368 | Ok(true) | |
369 | } else { | |
370 | Ok(false) | |
371 | } | |
372 | } | |
373 | ||
93aebb38 DM |
374 | // atomically read/update the task list, update status of finished tasks |
375 | // new_upid is added to the list when specified. | |
c386b06f | 376 | fn update_active_workers(new_upid: Option<&UPID>) -> Result<(), Error> { |
4b01c983 | 377 | |
f74a03da | 378 | let backup_user = crate::backup::backup_user()?; |
35950380 | 379 | |
66f4e6a8 | 380 | let lock = lock_task_list_files(true)?; |
4b01c983 | 381 | |
784fa1c2 DC |
382 | let mut finish_list: Vec<TaskListInfo> = read_task_file_from_path(PROXMOX_BACKUP_INDEX_TASK_FN)?; |
383 | let mut active_list: Vec<TaskListInfo> = read_task_file_from_path(PROXMOX_BACKUP_ACTIVE_TASK_FN)? | |
384 | .into_iter() | |
385 | .filter_map(|info| { | |
386 | if info.state.is_some() { | |
387 | // this can happen when the active file still includes finished tasks | |
388 | finish_list.push(info); | |
389 | return None; | |
4b01c983 | 390 | } |
4b01c983 | 391 | |
784fa1c2 DC |
392 | if !worker_is_active_local(&info.upid) { |
393 | println!("Detected stopped UPID {}", &info.upid_str); | |
394 | let now = proxmox::tools::time::epoch_i64(); | |
395 | let status = upid_read_status(&info.upid) | |
396 | .unwrap_or_else(|_| TaskState::Unknown { endtime: now }); | |
397 | finish_list.push(TaskListInfo { | |
398 | upid: info.upid, | |
399 | upid_str: info.upid_str, | |
400 | state: Some(status) | |
401 | }); | |
402 | return None; | |
4b01c983 | 403 | } |
784fa1c2 DC |
404 | |
405 | Some(info) | |
406 | }).collect(); | |
4b01c983 DM |
407 | |
408 | if let Some(upid) = new_upid { | |
409 | active_list.push(TaskListInfo { upid: upid.clone(), upid_str: upid.to_string(), state: None }); | |
410 | } | |
411 | ||
784fa1c2 | 412 | let active_raw = render_task_list(&active_list); |
4b01c983 | 413 | |
784fa1c2 DC |
414 | replace_file( |
415 | PROXMOX_BACKUP_ACTIVE_TASK_FN, | |
416 | active_raw.as_bytes(), | |
417 | CreateOptions::new() | |
418 | .owner(backup_user.uid) | |
419 | .group(backup_user.gid), | |
420 | )?; | |
93aebb38 | 421 | |
784fa1c2 | 422 | finish_list.sort_unstable_by(|a, b| { |
4b01c983 | 423 | match (&a.state, &b.state) { |
77bd2a46 | 424 | (Some(s1), Some(s2)) => s1.cmp(&s2), |
4b01c983 DM |
425 | (Some(_), None) => std::cmp::Ordering::Less, |
426 | (None, Some(_)) => std::cmp::Ordering::Greater, | |
427 | _ => a.upid.starttime.cmp(&b.upid.starttime), | |
428 | } | |
429 | }); | |
430 | ||
7eebe148 DC |
431 | |
432 | let start = if finish_list.len() > MAX_INDEX_TASKS { | |
433 | finish_list.len() - MAX_INDEX_TASKS | |
434 | } else { | |
435 | 0 | |
436 | }; | |
437 | ||
784fa1c2 | 438 | let end = (start+MAX_INDEX_TASKS).min(finish_list.len()); |
7eebe148 DC |
439 | |
440 | let index_raw = if end > start { | |
441 | render_task_list(&finish_list[start..end]) | |
442 | } else { | |
443 | "".to_string() | |
444 | }; | |
4b01c983 | 445 | |
feaa1ad3 | 446 | replace_file( |
784fa1c2 DC |
447 | PROXMOX_BACKUP_INDEX_TASK_FN, |
448 | index_raw.as_bytes(), | |
feaa1ad3 | 449 | CreateOptions::new() |
f74a03da DM |
450 | .owner(backup_user.uid) |
451 | .group(backup_user.gid), | |
feaa1ad3 | 452 | )?; |
4b01c983 | 453 | |
5ade6c25 DC |
454 | if !finish_list.is_empty() && start > 0 { |
455 | match std::fs::OpenOptions::new().append(true).create(true).open(PROXMOX_BACKUP_ARCHIVE_TASK_FN) { | |
456 | Ok(mut writer) => { | |
457 | for info in &finish_list[0..start] { | |
458 | writer.write_all(render_task_line(&info).as_bytes())?; | |
459 | } | |
460 | }, | |
461 | Err(err) => bail!("could not write task archive - {}", err), | |
462 | } | |
463 | ||
464 | nix::unistd::chown(PROXMOX_BACKUP_ARCHIVE_TASK_FN, Some(backup_user.uid), Some(backup_user.gid))?; | |
465 | } | |
466 | ||
4b01c983 DM |
467 | drop(lock); |
468 | ||
c386b06f | 469 | Ok(()) |
93aebb38 | 470 | } |
4b01c983 | 471 | |
bbeb0256 DC |
472 | fn render_task_line(info: &TaskListInfo) -> String { |
473 | let mut raw = String::new(); | |
474 | if let Some(status) = &info.state { | |
475 | raw.push_str(&format!("{} {:08X} {}\n", info.upid_str, status.endtime(), status)); | |
476 | } else { | |
477 | raw.push_str(&info.upid_str); | |
478 | raw.push('\n'); | |
479 | } | |
480 | ||
481 | raw | |
482 | } | |
483 | ||
484 | fn render_task_list(list: &[TaskListInfo]) -> String { | |
485 | let mut raw = String::new(); | |
486 | for info in list { | |
487 | raw.push_str(&render_task_line(&info)); | |
488 | } | |
489 | raw | |
490 | } | |
491 | ||
784fa1c2 DC |
492 | // note this is not locked, caller has to make sure it is |
493 | // this will skip (and log) lines that are not valid status lines | |
494 | fn read_task_file<R: Read>(reader: R) -> Result<Vec<TaskListInfo>, Error> | |
495 | { | |
496 | let reader = BufReader::new(reader); | |
497 | let mut list = Vec::new(); | |
498 | for line in reader.lines() { | |
499 | let line = line?; | |
500 | match parse_worker_status_line(&line) { | |
501 | Ok((upid_str, upid, state)) => list.push(TaskListInfo { | |
502 | upid_str, | |
503 | upid, | |
504 | state | |
505 | }), | |
506 | Err(err) => { | |
507 | eprintln!("unable to parse worker status '{}' - {}", line, err); | |
508 | continue; | |
509 | } | |
510 | }; | |
511 | } | |
512 | ||
513 | Ok(list) | |
514 | } | |
515 | ||
516 | // note this is not locked, caller has to make sure it is | |
517 | fn read_task_file_from_path<P>(path: P) -> Result<Vec<TaskListInfo>, Error> | |
518 | where | |
519 | P: AsRef<std::path::Path> + std::fmt::Debug, | |
520 | { | |
521 | let file = match File::open(&path) { | |
522 | Ok(f) => f, | |
523 | Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()), | |
524 | Err(err) => bail!("unable to open task list {:?} - {}", path, err), | |
525 | }; | |
526 | ||
527 | read_task_file(file) | |
528 | } | |
529 | ||
e7244387 DC |
530 | enum TaskFile { |
531 | Active, | |
532 | Index, | |
533 | Archive, | |
534 | End, | |
535 | } | |
536 | ||
537 | pub struct TaskListInfoIterator { | |
538 | list: VecDeque<TaskListInfo>, | |
539 | file: TaskFile, | |
540 | archive: Option<LogRotateFiles>, | |
541 | lock: Option<File>, | |
542 | } | |
543 | ||
544 | impl TaskListInfoIterator { | |
545 | pub fn new(active_only: bool) -> Result<Self, Error> { | |
546 | let (read_lock, active_list) = { | |
547 | let lock = lock_task_list_files(false)?; | |
548 | let active_list = read_task_file_from_path(PROXMOX_BACKUP_ACTIVE_TASK_FN)?; | |
549 | ||
550 | let needs_update = active_list | |
551 | .iter() | |
df4827f2 | 552 | .any(|info| info.state.is_some() || !worker_is_active_local(&info.upid)); |
e7244387 DC |
553 | |
554 | if needs_update { | |
555 | drop(lock); | |
556 | update_active_workers(None)?; | |
557 | let lock = lock_task_list_files(false)?; | |
558 | let active_list = read_task_file_from_path(PROXMOX_BACKUP_ACTIVE_TASK_FN)?; | |
559 | (lock, active_list) | |
560 | } else { | |
561 | (lock, active_list) | |
562 | } | |
563 | }; | |
564 | ||
565 | let archive = if active_only { | |
566 | None | |
567 | } else { | |
e4f5f59e TL |
568 | let logrotate = LogRotate::new(PROXMOX_BACKUP_ARCHIVE_TASK_FN, true) |
569 | .ok_or_else(|| format_err!("could not get archive file names"))?; | |
e7244387 DC |
570 | Some(logrotate.files()) |
571 | }; | |
572 | ||
573 | let file = if active_only { TaskFile::End } else { TaskFile::Active }; | |
574 | let lock = if active_only { None } else { Some(read_lock) }; | |
575 | ||
576 | Ok(Self { | |
577 | list: active_list.into(), | |
578 | file, | |
579 | archive, | |
580 | lock, | |
581 | }) | |
582 | } | |
583 | } | |
584 | ||
585 | impl Iterator for TaskListInfoIterator { | |
586 | type Item = Result<TaskListInfo, Error>; | |
587 | ||
588 | fn next(&mut self) -> Option<Self::Item> { | |
589 | loop { | |
590 | if let Some(element) = self.list.pop_back() { | |
591 | return Some(Ok(element)); | |
592 | } else { | |
593 | match self.file { | |
594 | TaskFile::Active => { | |
595 | let index = match read_task_file_from_path(PROXMOX_BACKUP_INDEX_TASK_FN) { | |
596 | Ok(index) => index, | |
597 | Err(err) => return Some(Err(err)), | |
598 | }; | |
599 | self.list.append(&mut index.into()); | |
600 | self.file = TaskFile::Index; | |
601 | }, | |
602 | TaskFile::Index | TaskFile::Archive => { | |
603 | if let Some(mut archive) = self.archive.take() { | |
604 | if let Some(file) = archive.next() { | |
605 | let list = match read_task_file(file) { | |
606 | Ok(list) => list, | |
607 | Err(err) => return Some(Err(err)), | |
608 | }; | |
609 | self.list.append(&mut list.into()); | |
610 | self.archive = Some(archive); | |
611 | self.file = TaskFile::Archive; | |
612 | continue; | |
613 | } | |
614 | } | |
615 | self.file = TaskFile::End; | |
616 | self.lock.take(); | |
617 | return None; | |
618 | } | |
619 | TaskFile::End => return None, | |
620 | } | |
621 | } | |
622 | } | |
623 | } | |
624 | } | |
625 | ||
882594c5 DM |
626 | /// Launch long running worker tasks. |
627 | /// | |
628 | /// A worker task can either be a whole thread, or a simply tokio | |
629 | /// task/future. Each task can `log()` messages, which are stored | |
630 | /// persistently to files. Task should poll the `abort_requested` | |
631 | /// flag, and stop execution when requested. | |
479f6e40 DM |
632 | #[derive(Debug)] |
633 | pub struct WorkerTask { | |
634 | upid: UPID, | |
635 | data: Mutex<WorkerTaskData>, | |
636 | abort_requested: AtomicBool, | |
637 | } | |
638 | ||
639 | impl std::fmt::Display for WorkerTask { | |
640 | ||
641 | fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { | |
642 | self.upid.fmt(f) | |
643 | } | |
644 | } | |
645 | ||
646 | #[derive(Debug)] | |
647 | struct WorkerTaskData { | |
648 | logger: FileLogger, | |
649 | progress: f64, // 0..1 | |
f6de2c73 | 650 | warn_count: u64, |
75bc49be | 651 | pub abort_listeners: Vec<oneshot::Sender<()>>, |
479f6e40 DM |
652 | } |
653 | ||
654 | impl Drop for WorkerTask { | |
655 | ||
656 | fn drop(&mut self) { | |
657 | println!("unregister worker"); | |
658 | } | |
659 | } | |
660 | ||
661 | impl WorkerTask { | |
662 | ||
e7cb4dc5 | 663 | pub fn new(worker_type: &str, worker_id: Option<String>, userid: Userid, to_stdout: bool) -> Result<Arc<Self>, Error> { |
479f6e40 DM |
664 | println!("register worker"); |
665 | ||
e7cb4dc5 | 666 | let upid = UPID::new(worker_type, worker_id, userid)?; |
634132fe | 667 | let task_id = upid.task_id; |
479f6e40 | 668 | |
634132fe | 669 | let mut path = std::path::PathBuf::from(PROXMOX_BACKUP_TASK_DIR); |
35950380 | 670 | |
479f6e40 DM |
671 | path.push(format!("{:02X}", upid.pstart % 256)); |
672 | ||
f74a03da | 673 | let backup_user = crate::backup::backup_user()?; |
35950380 | 674 | |
f74a03da | 675 | create_path(&path, None, Some(CreateOptions::new().owner(backup_user.uid).group(backup_user.gid)))?; |
479f6e40 DM |
676 | |
677 | path.push(upid.to_string()); | |
678 | ||
679 | println!("FILE: {:?}", path); | |
680 | ||
c0df91f8 TL |
681 | let logger_options = FileLogOptions { |
682 | to_stdout: to_stdout, | |
683 | exclusive: true, | |
e5adbc34 | 684 | prefix_time: true, |
c0df91f8 TL |
685 | read: true, |
686 | ..Default::default() | |
687 | }; | |
688 | let logger = FileLogger::new(&path, logger_options)?; | |
f74a03da | 689 | nix::unistd::chown(&path, Some(backup_user.uid), Some(backup_user.gid))?; |
479f6e40 DM |
690 | |
691 | let worker = Arc::new(Self { | |
05d755b2 | 692 | upid: upid.clone(), |
479f6e40 DM |
693 | abort_requested: AtomicBool::new(false), |
694 | data: Mutex::new(WorkerTaskData { | |
695 | logger, | |
696 | progress: 0.0, | |
f6de2c73 | 697 | warn_count: 0, |
75bc49be | 698 | abort_listeners: vec![], |
479f6e40 DM |
699 | }), |
700 | }); | |
701 | ||
05d755b2 DC |
702 | // scope to drop the lock again after inserting |
703 | { | |
704 | let mut hash = WORKER_TASK_LIST.lock().unwrap(); | |
705 | hash.insert(task_id, worker.clone()); | |
706 | super::set_worker_count(hash.len()); | |
707 | } | |
7a630df7 | 708 | |
05d755b2 | 709 | update_active_workers(Some(&upid))?; |
479f6e40 DM |
710 | |
711 | Ok(worker) | |
712 | } | |
713 | ||
882594c5 | 714 | /// Spawn a new tokio task/future. |
660c6846 DM |
715 | pub fn spawn<F, T>( |
716 | worker_type: &str, | |
717 | worker_id: Option<String>, | |
e7cb4dc5 | 718 | userid: Userid, |
660c6846 DM |
719 | to_stdout: bool, |
720 | f: F, | |
721 | ) -> Result<String, Error> | |
479f6e40 | 722 | where F: Send + 'static + FnOnce(Arc<WorkerTask>) -> T, |
75fef4b4 | 723 | T: Send + 'static + Future<Output = Result<(), Error>>, |
479f6e40 | 724 | { |
e7cb4dc5 | 725 | let worker = WorkerTask::new(worker_type, worker_id, userid, to_stdout)?; |
660c6846 | 726 | let upid_str = worker.upid.to_string(); |
75fef4b4 WB |
727 | let f = f(worker.clone()); |
728 | tokio::spawn(async move { | |
729 | let result = f.await; | |
dd8e744f | 730 | worker.log_result(&result); |
75fef4b4 | 731 | }); |
479f6e40 | 732 | |
660c6846 | 733 | Ok(upid_str) |
479f6e40 DM |
734 | } |
735 | ||
882594c5 | 736 | /// Create a new worker thread. |
660c6846 DM |
737 | pub fn new_thread<F>( |
738 | worker_type: &str, | |
739 | worker_id: Option<String>, | |
e7cb4dc5 | 740 | userid: Userid, |
660c6846 DM |
741 | to_stdout: bool, |
742 | f: F, | |
743 | ) -> Result<String, Error> | |
d3f4c08f | 744 | where F: Send + UnwindSafe + 'static + FnOnce(Arc<WorkerTask>) -> Result<(), Error> |
479f6e40 DM |
745 | { |
746 | println!("register worker thread"); | |
747 | ||
e7cb4dc5 | 748 | let worker = WorkerTask::new(worker_type, worker_id, userid, to_stdout)?; |
660c6846 | 749 | let upid_str = worker.upid.to_string(); |
479f6e40 | 750 | |
217170e1 | 751 | let _child = std::thread::Builder::new().name(upid_str.clone()).spawn(move || { |
d3f4c08f DM |
752 | let worker1 = worker.clone(); |
753 | let result = match std::panic::catch_unwind(move || f(worker1)) { | |
754 | Ok(r) => r, | |
755 | Err(panic) => { | |
756 | match panic.downcast::<&str>() { | |
757 | Ok(panic_msg) => { | |
758 | Err(format_err!("worker panicked: {}", panic_msg)) | |
759 | } | |
760 | Err(_) => { | |
761 | Err(format_err!("worker panicked: unknown type.")) | |
762 | } | |
763 | } | |
764 | } | |
765 | }; | |
766 | ||
dd8e744f | 767 | worker.log_result(&result); |
479f6e40 DM |
768 | }); |
769 | ||
660c6846 | 770 | Ok(upid_str) |
479f6e40 DM |
771 | } |
772 | ||
4c116baf DC |
773 | /// create state from self and a result |
774 | pub fn create_state(&self, result: &Result<(), Error>) -> TaskState { | |
f6de2c73 | 775 | let warn_count = self.data.lock().unwrap().warn_count; |
cef03f41 | 776 | |
6a7be83e | 777 | let endtime = proxmox::tools::time::epoch_i64(); |
77bd2a46 | 778 | |
4b01c983 | 779 | if let Err(err) = result { |
77bd2a46 | 780 | TaskState::Error { message: err.to_string(), endtime } |
f6de2c73 | 781 | } else if warn_count > 0 { |
77bd2a46 | 782 | TaskState::Warning { count: warn_count, endtime } |
4b01c983 | 783 | } else { |
77bd2a46 | 784 | TaskState::OK { endtime } |
4b01c983 | 785 | } |
cef03f41 DC |
786 | } |
787 | ||
788 | /// Log task result, remove task from running list | |
789 | pub fn log_result(&self, result: &Result<(), Error>) { | |
4c116baf DC |
790 | let state = self.create_state(result); |
791 | self.log(state.result_text()); | |
418def7a DM |
792 | |
793 | WORKER_TASK_LIST.lock().unwrap().remove(&self.upid.task_id); | |
794 | let _ = update_active_workers(None); | |
795 | super::set_worker_count(WORKER_TASK_LIST.lock().unwrap().len()); | |
4b01c983 DM |
796 | } |
797 | ||
882594c5 | 798 | /// Log a message. |
479f6e40 DM |
799 | pub fn log<S: AsRef<str>>(&self, msg: S) { |
800 | let mut data = self.data.lock().unwrap(); | |
801 | data.logger.log(msg); | |
802 | } | |
803 | ||
f6de2c73 DC |
804 | /// Log a message as warning. |
805 | pub fn warn<S: AsRef<str>>(&self, msg: S) { | |
806 | let mut data = self.data.lock().unwrap(); | |
807 | data.logger.log(format!("WARN: {}", msg.as_ref())); | |
808 | data.warn_count += 1; | |
809 | } | |
810 | ||
882594c5 | 811 | /// Set progress indicator |
479f6e40 DM |
812 | pub fn progress(&self, progress: f64) { |
813 | if progress >= 0.0 && progress <= 1.0 { | |
814 | let mut data = self.data.lock().unwrap(); | |
815 | data.progress = progress; | |
816 | } else { | |
817 | // fixme: log!("task '{}': ignoring strange value for progress '{}'", self.upid, progress); | |
818 | } | |
819 | } | |
820 | ||
882594c5 | 821 | /// Request abort |
d607b886 | 822 | pub fn request_abort(&self) { |
98a181f0 | 823 | eprintln!("set abort flag for worker {}", self.upid); |
479f6e40 | 824 | self.abort_requested.store(true, Ordering::SeqCst); |
75bc49be DM |
825 | // noitify listeners |
826 | let mut data = self.data.lock().unwrap(); | |
827 | loop { | |
828 | match data.abort_listeners.pop() { | |
829 | None => { break; }, | |
830 | Some(ch) => { | |
831 | let _ = ch.send(()); // ignore erros here | |
832 | }, | |
833 | } | |
834 | } | |
479f6e40 DM |
835 | } |
836 | ||
882594c5 | 837 | /// Test if abort was requested. |
479f6e40 DM |
838 | pub fn abort_requested(&self) -> bool { |
839 | self.abort_requested.load(Ordering::SeqCst) | |
840 | } | |
841 | ||
882594c5 | 842 | /// Fail if abort was requested. |
479f6e40 DM |
843 | pub fn fail_on_abort(&self) -> Result<(), Error> { |
844 | if self.abort_requested() { | |
99641a6b | 845 | bail!("abort requested - aborting task"); |
479f6e40 DM |
846 | } |
847 | Ok(()) | |
848 | } | |
75bc49be DM |
849 | |
850 | /// Get a future which resolves on task abort | |
851 | pub fn abort_future(&self) -> oneshot::Receiver<()> { | |
852 | let (tx, rx) = oneshot::channel::<()>(); | |
853 | ||
854 | let mut data = self.data.lock().unwrap(); | |
855 | if self.abort_requested() { | |
856 | let _ = tx.send(()); | |
857 | } else { | |
858 | data.abort_listeners.push(tx); | |
859 | } | |
860 | rx | |
861 | } | |
4bd2a9e4 DC |
862 | |
863 | pub fn upid(&self) -> &UPID { | |
864 | &self.upid | |
865 | } | |
479f6e40 | 866 | } |
d1993187 WB |
867 | |
868 | impl crate::task::TaskState for WorkerTask { | |
869 | fn check_abort(&self) -> Result<(), Error> { | |
870 | self.fail_on_abort() | |
871 | } | |
872 | ||
873 | fn log(&self, level: log::Level, message: &std::fmt::Arguments) { | |
874 | match level { | |
875 | log::Level::Error => self.warn(&message.to_string()), | |
876 | log::Level::Warn => self.warn(&message.to_string()), | |
877 | log::Level::Info => self.log(&message.to_string()), | |
878 | log::Level::Debug => self.log(&format!("DEBUG: {}", message)), | |
879 | log::Level::Trace => self.log(&format!("TRACE: {}", message)), | |
880 | } | |
881 | } | |
882 | } |