]>
Commit | Line | Data |
---|---|---|
1 | use std::collections::HashMap; | |
2 | use std::fs::File; | |
3 | use std::io::{BufRead, BufReader}; | |
4 | use std::panic::UnwindSafe; | |
5 | use std::sync::atomic::{AtomicBool, Ordering}; | |
6 | use std::sync::{Arc, Mutex}; | |
7 | ||
8 | use chrono::Local; | |
9 | use failure::*; | |
10 | use futures::*; | |
11 | use lazy_static::lazy_static; | |
12 | use nix::unistd::Pid; | |
13 | use serde_json::{json, Value}; | |
14 | use tokio::sync::oneshot; | |
15 | ||
16 | use proxmox::sys::linux::procfs; | |
17 | use proxmox::tools::{ | |
18 | try_block, | |
19 | fs::{create_path, replace_file, CreateOptions}, | |
20 | }; | |
21 | ||
22 | use super::UPID; | |
23 | ||
24 | use crate::tools::FileLogger; | |
25 | ||
26 | macro_rules! PROXMOX_BACKUP_VAR_RUN_DIR_M { () => ("/run/proxmox-backup") } | |
27 | macro_rules! PROXMOX_BACKUP_LOG_DIR_M { () => ("/var/log/proxmox-backup") } | |
28 | macro_rules! PROXMOX_BACKUP_TASK_DIR_M { () => (concat!( PROXMOX_BACKUP_LOG_DIR_M!(), "/tasks")) } | |
29 | ||
30 | pub const PROXMOX_BACKUP_VAR_RUN_DIR: &str = PROXMOX_BACKUP_VAR_RUN_DIR_M!(); | |
31 | pub const PROXMOX_BACKUP_LOG_DIR: &str = PROXMOX_BACKUP_LOG_DIR_M!(); | |
32 | pub const PROXMOX_BACKUP_TASK_DIR: &str = PROXMOX_BACKUP_TASK_DIR_M!(); | |
33 | pub const PROXMOX_BACKUP_TASK_LOCK_FN: &str = concat!(PROXMOX_BACKUP_TASK_DIR_M!(), "/.active.lock"); | |
34 | pub const PROXMOX_BACKUP_ACTIVE_TASK_FN: &str = concat!(PROXMOX_BACKUP_TASK_DIR_M!(), "/active"); | |
35 | ||
36 | lazy_static! { | |
37 | static ref WORKER_TASK_LIST: Mutex<HashMap<usize, Arc<WorkerTask>>> = Mutex::new(HashMap::new()); | |
38 | ||
39 | static ref MY_PID: i32 = unsafe { libc::getpid() }; | |
40 | static ref MY_PID_PSTART: u64 = procfs::PidStat::read_from_pid(Pid::from_raw(*MY_PID)) | |
41 | .unwrap() | |
42 | .starttime; | |
43 | } | |
44 | ||
45 | /// Test if the task is still running | |
46 | pub fn worker_is_active(upid: &UPID) -> bool { | |
47 | ||
48 | if (upid.pid == *MY_PID) && (upid.pstart == *MY_PID_PSTART) { | |
49 | WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id) | |
50 | } else { | |
51 | procfs::check_process_running_pstart(upid.pid, upid.pstart).is_some() | |
52 | } | |
53 | } | |
54 | ||
55 | pub fn create_task_control_socket() -> Result<(), Error> { | |
56 | ||
57 | let socketname = format!( | |
58 | "\0{}/proxmox-task-control-{}.sock", PROXMOX_BACKUP_VAR_RUN_DIR, *MY_PID); | |
59 | ||
60 | let control_future = super::create_control_socket(socketname, |param| { | |
61 | let param = param.as_object() | |
62 | .ok_or_else(|| format_err!("unable to parse parameters (expected json object)"))?; | |
63 | if param.keys().count() != 2 { bail!("wrong number of parameters"); } | |
64 | ||
65 | let command = param.get("command") | |
66 | .ok_or_else(|| format_err!("unable to parse parameters (missing command)"))?; | |
67 | ||
68 | // this is the only command for now | |
69 | if command != "abort-task" { bail!("got unknown command '{}'", command); } | |
70 | ||
71 | let upid_str = param["upid"].as_str() | |
72 | .ok_or_else(|| format_err!("unable to parse parameters (missing upid)"))?; | |
73 | ||
74 | let upid = upid_str.parse::<UPID>()?; | |
75 | ||
76 | if !((upid.pid == *MY_PID) && (upid.pstart == *MY_PID_PSTART)) { | |
77 | bail!("upid does not belong to this process"); | |
78 | } | |
79 | ||
80 | let hash = WORKER_TASK_LIST.lock().unwrap(); | |
81 | if let Some(ref worker) = hash.get(&upid.task_id) { | |
82 | worker.request_abort(); | |
83 | } else { | |
84 | // assume task is already stopped | |
85 | } | |
86 | Ok(Value::Null) | |
87 | })?; | |
88 | ||
89 | tokio::spawn(control_future); | |
90 | ||
91 | Ok(()) | |
92 | } | |
93 | ||
94 | pub fn abort_worker_async(upid: UPID) { | |
95 | tokio::spawn(async move { | |
96 | if let Err(err) = abort_worker(upid).await { | |
97 | eprintln!("abort worker failed - {}", err); | |
98 | } | |
99 | }); | |
100 | } | |
101 | ||
102 | pub fn abort_worker(upid: UPID) -> impl Future<Output = Result<(), Error>> { | |
103 | ||
104 | let target_pid = upid.pid; | |
105 | ||
106 | let socketname = format!( | |
107 | "\0{}/proxmox-task-control-{}.sock", PROXMOX_BACKUP_VAR_RUN_DIR, target_pid); | |
108 | ||
109 | let cmd = json!({ | |
110 | "command": "abort-task", | |
111 | "upid": upid.to_string(), | |
112 | }); | |
113 | ||
114 | super::send_command(socketname, cmd).map_ok(|_| ()) | |
115 | } | |
116 | ||
117 | fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<(i64, String)>), Error> { | |
118 | ||
119 | let data = line.splitn(3, ' ').collect::<Vec<&str>>(); | |
120 | ||
121 | let len = data.len(); | |
122 | ||
123 | match len { | |
124 | 1 => Ok((data[0].to_owned(), data[0].parse::<UPID>()?, None)), | |
125 | 3 => { | |
126 | let endtime = i64::from_str_radix(data[1], 16)?; | |
127 | Ok((data[0].to_owned(), data[0].parse::<UPID>()?, Some((endtime, data[2].to_owned())))) | |
128 | } | |
129 | _ => bail!("wrong number of components"), | |
130 | } | |
131 | } | |
132 | ||
133 | /// Create task log directory with correct permissions | |
134 | pub fn create_task_log_dirs() -> Result<(), Error> { | |
135 | ||
136 | try_block!({ | |
137 | let backup_user = crate::backup::backup_user()?; | |
138 | let opts = CreateOptions::new() | |
139 | .owner(backup_user.uid) | |
140 | .group(backup_user.gid); | |
141 | ||
142 | create_path(PROXMOX_BACKUP_LOG_DIR, None, Some(opts.clone()))?; | |
143 | create_path(PROXMOX_BACKUP_TASK_DIR, None, Some(opts.clone()))?; | |
144 | create_path(PROXMOX_BACKUP_VAR_RUN_DIR, None, Some(opts))?; | |
145 | Ok(()) | |
146 | }).map_err(|err: Error| format_err!("unable to create task log dir - {}", err))?; | |
147 | ||
148 | Ok(()) | |
149 | } | |
150 | ||
151 | /// Read exits status from task log file | |
152 | pub fn upid_read_status(upid: &UPID) -> Result<String, Error> { | |
153 | let mut status = String::from("unknown"); | |
154 | ||
155 | let path = upid.log_path(); | |
156 | ||
157 | let mut file = File::open(path)?; | |
158 | ||
159 | /// speedup - only read tail | |
160 | use std::io::Seek; | |
161 | use std::io::SeekFrom; | |
162 | let _ = file.seek(SeekFrom::End(-8192)); // ignore errors | |
163 | ||
164 | let reader = BufReader::new(file); | |
165 | ||
166 | for line in reader.lines() { | |
167 | let line = line?; | |
168 | ||
169 | let mut iter = line.splitn(2, ": TASK "); | |
170 | if iter.next() == None { continue; } | |
171 | match iter.next() { | |
172 | None => continue, | |
173 | Some(rest) => { | |
174 | if rest == "OK" { | |
175 | status = String::from(rest); | |
176 | } else if rest.starts_with("ERROR: ") { | |
177 | status = String::from(&rest[7..]); | |
178 | } | |
179 | } | |
180 | } | |
181 | } | |
182 | ||
183 | Ok(status) | |
184 | } | |
185 | ||
186 | /// Task details including parsed UPID | |
187 | /// | |
188 | /// If there is no `state`, the task is still running. | |
189 | #[derive(Debug)] | |
190 | pub struct TaskListInfo { | |
191 | /// The parsed UPID | |
192 | pub upid: UPID, | |
193 | /// UPID string representation | |
194 | pub upid_str: String, | |
195 | /// Task `(endtime, status)` if already finished | |
196 | /// | |
197 | /// The `status` ise iether `unknown`, `OK`, or `ERROR: ...` | |
198 | pub state: Option<(i64, String)>, // endtime, status | |
199 | } | |
200 | ||
201 | // atomically read/update the task list, update status of finished tasks | |
202 | // new_upid is added to the list when specified. | |
203 | // Returns a sorted list of known tasks, | |
204 | fn update_active_workers(new_upid: Option<&UPID>) -> Result<Vec<TaskListInfo>, Error> { | |
205 | ||
206 | let backup_user = crate::backup::backup_user()?; | |
207 | ||
208 | let lock = crate::tools::open_file_locked(PROXMOX_BACKUP_TASK_LOCK_FN, std::time::Duration::new(10, 0))?; | |
209 | nix::unistd::chown(PROXMOX_BACKUP_TASK_LOCK_FN, Some(backup_user.uid), Some(backup_user.gid))?; | |
210 | ||
211 | let reader = match File::open(PROXMOX_BACKUP_ACTIVE_TASK_FN) { | |
212 | Ok(f) => Some(BufReader::new(f)), | |
213 | Err(err) => { | |
214 | if err.kind() == std::io::ErrorKind::NotFound { | |
215 | None | |
216 | } else { | |
217 | bail!("unable to open active worker {:?} - {}", PROXMOX_BACKUP_ACTIVE_TASK_FN, err); | |
218 | } | |
219 | } | |
220 | }; | |
221 | ||
222 | let mut active_list = vec![]; | |
223 | let mut finish_list = vec![]; | |
224 | ||
225 | if let Some(lines) = reader.map(|r| r.lines()) { | |
226 | ||
227 | for line in lines { | |
228 | let line = line?; | |
229 | match parse_worker_status_line(&line) { | |
230 | Err(err) => bail!("unable to parse active worker status '{}' - {}", line, err), | |
231 | Ok((upid_str, upid, state)) => { | |
232 | ||
233 | let running = worker_is_active(&upid); | |
234 | ||
235 | if running { | |
236 | active_list.push(TaskListInfo { upid, upid_str, state: None }); | |
237 | } else { | |
238 | match state { | |
239 | None => { | |
240 | println!("Detected stoped UPID {}", upid_str); | |
241 | let status = upid_read_status(&upid) | |
242 | .unwrap_or_else(|_| String::from("unknown")); | |
243 | finish_list.push(TaskListInfo { | |
244 | upid, upid_str, state: Some((Local::now().timestamp(), status)) | |
245 | }); | |
246 | } | |
247 | Some((endtime, status)) => { | |
248 | finish_list.push(TaskListInfo { | |
249 | upid, upid_str, state: Some((endtime, status)) | |
250 | }) | |
251 | } | |
252 | } | |
253 | } | |
254 | } | |
255 | } | |
256 | } | |
257 | } | |
258 | ||
259 | if let Some(upid) = new_upid { | |
260 | active_list.push(TaskListInfo { upid: upid.clone(), upid_str: upid.to_string(), state: None }); | |
261 | } | |
262 | ||
263 | // assemble list without duplicates | |
264 | // we include all active tasks, | |
265 | // and fill up to 1000 entries with finished tasks | |
266 | ||
267 | let max = 1000; | |
268 | ||
269 | let mut task_hash = HashMap::new(); | |
270 | ||
271 | for info in active_list { | |
272 | task_hash.insert(info.upid_str.clone(), info); | |
273 | } | |
274 | ||
275 | for info in finish_list { | |
276 | if task_hash.len() > max { break; } | |
277 | if !task_hash.contains_key(&info.upid_str) { | |
278 | task_hash.insert(info.upid_str.clone(), info); | |
279 | } | |
280 | } | |
281 | ||
282 | let mut task_list: Vec<TaskListInfo> = vec![]; | |
283 | for (_, info) in task_hash { task_list.push(info); } | |
284 | ||
285 | task_list.sort_unstable_by(|b, a| { // lastest on top | |
286 | match (&a.state, &b.state) { | |
287 | (Some(s1), Some(s2)) => s1.0.cmp(&s2.0), | |
288 | (Some(_), None) => std::cmp::Ordering::Less, | |
289 | (None, Some(_)) => std::cmp::Ordering::Greater, | |
290 | _ => a.upid.starttime.cmp(&b.upid.starttime), | |
291 | } | |
292 | }); | |
293 | ||
294 | let mut raw = String::new(); | |
295 | for info in &task_list { | |
296 | if let Some((endtime, status)) = &info.state { | |
297 | raw.push_str(&format!("{} {:08X} {}\n", info.upid_str, endtime, status)); | |
298 | } else { | |
299 | raw.push_str(&info.upid_str); | |
300 | raw.push('\n'); | |
301 | } | |
302 | } | |
303 | ||
304 | replace_file( | |
305 | PROXMOX_BACKUP_ACTIVE_TASK_FN, | |
306 | raw.as_bytes(), | |
307 | CreateOptions::new() | |
308 | .owner(backup_user.uid) | |
309 | .group(backup_user.gid), | |
310 | )?; | |
311 | ||
312 | drop(lock); | |
313 | ||
314 | Ok(task_list) | |
315 | } | |
316 | ||
317 | /// Returns a sorted list of known tasks | |
318 | /// | |
319 | /// The list is sorted by `(starttime, endtime)` in ascending order | |
320 | pub fn read_task_list() -> Result<Vec<TaskListInfo>, Error> { | |
321 | update_active_workers(None) | |
322 | } | |
323 | ||
324 | /// Launch long running worker tasks. | |
325 | /// | |
326 | /// A worker task can either be a whole thread, or a simply tokio | |
327 | /// task/future. Each task can `log()` messages, which are stored | |
328 | /// persistently to files. Task should poll the `abort_requested` | |
329 | /// flag, and stop execution when requested. | |
330 | #[derive(Debug)] | |
331 | pub struct WorkerTask { | |
332 | upid: UPID, | |
333 | data: Mutex<WorkerTaskData>, | |
334 | abort_requested: AtomicBool, | |
335 | } | |
336 | ||
337 | impl std::fmt::Display for WorkerTask { | |
338 | ||
339 | fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { | |
340 | self.upid.fmt(f) | |
341 | } | |
342 | } | |
343 | ||
344 | #[derive(Debug)] | |
345 | struct WorkerTaskData { | |
346 | logger: FileLogger, | |
347 | progress: f64, // 0..1 | |
348 | pub abort_listeners: Vec<oneshot::Sender<()>>, | |
349 | } | |
350 | ||
351 | impl Drop for WorkerTask { | |
352 | ||
353 | fn drop(&mut self) { | |
354 | println!("unregister worker"); | |
355 | } | |
356 | } | |
357 | ||
358 | impl WorkerTask { | |
359 | ||
360 | pub fn new(worker_type: &str, worker_id: Option<String>, username: &str, to_stdout: bool) -> Result<Arc<Self>, Error> { | |
361 | println!("register worker"); | |
362 | ||
363 | let upid = UPID::new(worker_type, worker_id, username)?; | |
364 | let task_id = upid.task_id; | |
365 | ||
366 | let mut path = std::path::PathBuf::from(PROXMOX_BACKUP_TASK_DIR); | |
367 | ||
368 | path.push(format!("{:02X}", upid.pstart % 256)); | |
369 | ||
370 | let backup_user = crate::backup::backup_user()?; | |
371 | ||
372 | create_path(&path, None, Some(CreateOptions::new().owner(backup_user.uid).group(backup_user.gid)))?; | |
373 | ||
374 | path.push(upid.to_string()); | |
375 | ||
376 | println!("FILE: {:?}", path); | |
377 | ||
378 | let logger = FileLogger::new(&path, to_stdout)?; | |
379 | nix::unistd::chown(&path, Some(backup_user.uid), Some(backup_user.gid))?; | |
380 | ||
381 | update_active_workers(Some(&upid))?; | |
382 | ||
383 | let worker = Arc::new(Self { | |
384 | upid, | |
385 | abort_requested: AtomicBool::new(false), | |
386 | data: Mutex::new(WorkerTaskData { | |
387 | logger, | |
388 | progress: 0.0, | |
389 | abort_listeners: vec![], | |
390 | }), | |
391 | }); | |
392 | ||
393 | let mut hash = WORKER_TASK_LIST.lock().unwrap(); | |
394 | ||
395 | hash.insert(task_id, worker.clone()); | |
396 | super::set_worker_count(hash.len()); | |
397 | ||
398 | Ok(worker) | |
399 | } | |
400 | ||
401 | /// Spawn a new tokio task/future. | |
402 | pub fn spawn<F, T>( | |
403 | worker_type: &str, | |
404 | worker_id: Option<String>, | |
405 | username: &str, | |
406 | to_stdout: bool, | |
407 | f: F, | |
408 | ) -> Result<String, Error> | |
409 | where F: Send + 'static + FnOnce(Arc<WorkerTask>) -> T, | |
410 | T: Send + 'static + Future<Output = Result<(), Error>>, | |
411 | { | |
412 | let worker = WorkerTask::new(worker_type, worker_id, username, to_stdout)?; | |
413 | let upid_str = worker.upid.to_string(); | |
414 | let f = f(worker.clone()); | |
415 | tokio::spawn(async move { | |
416 | let result = f.await; | |
417 | worker.log_result(&result); | |
418 | }); | |
419 | ||
420 | Ok(upid_str) | |
421 | } | |
422 | ||
423 | /// Create a new worker thread. | |
424 | pub fn new_thread<F>( | |
425 | worker_type: &str, | |
426 | worker_id: Option<String>, | |
427 | username: &str, | |
428 | to_stdout: bool, | |
429 | f: F, | |
430 | ) -> Result<String, Error> | |
431 | where F: Send + UnwindSafe + 'static + FnOnce(Arc<WorkerTask>) -> Result<(), Error> | |
432 | { | |
433 | println!("register worker thread"); | |
434 | ||
435 | let (p, c) = oneshot::channel::<()>(); | |
436 | ||
437 | let worker = WorkerTask::new(worker_type, worker_id, username, to_stdout)?; | |
438 | let upid_str = worker.upid.to_string(); | |
439 | ||
440 | let _child = std::thread::Builder::new().name(upid_str.clone()).spawn(move || { | |
441 | let worker1 = worker.clone(); | |
442 | let result = match std::panic::catch_unwind(move || f(worker1)) { | |
443 | Ok(r) => r, | |
444 | Err(panic) => { | |
445 | match panic.downcast::<&str>() { | |
446 | Ok(panic_msg) => { | |
447 | Err(format_err!("worker panicked: {}", panic_msg)) | |
448 | } | |
449 | Err(_) => { | |
450 | Err(format_err!("worker panicked: unknown type.")) | |
451 | } | |
452 | } | |
453 | } | |
454 | }; | |
455 | ||
456 | worker.log_result(&result); | |
457 | p.send(()).unwrap(); | |
458 | }); | |
459 | ||
460 | tokio::spawn(c.map(|_| ())); | |
461 | ||
462 | Ok(upid_str) | |
463 | } | |
464 | ||
465 | /// Log task result, remove task from running list | |
466 | pub fn log_result(&self, result: &Result<(), Error>) { | |
467 | ||
468 | if let Err(err) = result { | |
469 | self.log(&format!("TASK ERROR: {}", err)); | |
470 | } else { | |
471 | self.log("TASK OK"); | |
472 | } | |
473 | ||
474 | WORKER_TASK_LIST.lock().unwrap().remove(&self.upid.task_id); | |
475 | let _ = update_active_workers(None); | |
476 | super::set_worker_count(WORKER_TASK_LIST.lock().unwrap().len()); | |
477 | } | |
478 | ||
479 | /// Log a message. | |
480 | pub fn log<S: AsRef<str>>(&self, msg: S) { | |
481 | let mut data = self.data.lock().unwrap(); | |
482 | data.logger.log(msg); | |
483 | } | |
484 | ||
485 | /// Set progress indicator | |
486 | pub fn progress(&self, progress: f64) { | |
487 | if progress >= 0.0 && progress <= 1.0 { | |
488 | let mut data = self.data.lock().unwrap(); | |
489 | data.progress = progress; | |
490 | } else { | |
491 | // fixme: log!("task '{}': ignoring strange value for progress '{}'", self.upid, progress); | |
492 | } | |
493 | } | |
494 | ||
495 | /// Request abort | |
496 | pub fn request_abort(&self) { | |
497 | eprintln!("set abort flag for worker {}", self.upid); | |
498 | self.abort_requested.store(true, Ordering::SeqCst); | |
499 | // noitify listeners | |
500 | let mut data = self.data.lock().unwrap(); | |
501 | loop { | |
502 | match data.abort_listeners.pop() { | |
503 | None => { break; }, | |
504 | Some(ch) => { | |
505 | let _ = ch.send(()); // ignore erros here | |
506 | }, | |
507 | } | |
508 | } | |
509 | } | |
510 | ||
511 | /// Test if abort was requested. | |
512 | pub fn abort_requested(&self) -> bool { | |
513 | self.abort_requested.load(Ordering::SeqCst) | |
514 | } | |
515 | ||
516 | /// Fail if abort was requested. | |
517 | pub fn fail_on_abort(&self) -> Result<(), Error> { | |
518 | if self.abort_requested() { | |
519 | bail!("task '{}': abort requested - aborting task", self.upid); | |
520 | } | |
521 | Ok(()) | |
522 | } | |
523 | ||
524 | /// Get a future which resolves on task abort | |
525 | pub fn abort_future(&self) -> oneshot::Receiver<()> { | |
526 | let (tx, rx) = oneshot::channel::<()>(); | |
527 | ||
528 | let mut data = self.data.lock().unwrap(); | |
529 | if self.abort_requested() { | |
530 | let _ = tx.send(()); | |
531 | } else { | |
532 | data.abort_listeners.push(tx); | |
533 | } | |
534 | rx | |
535 | } | |
536 | } |