]>
Commit | Line | Data |
---|---|---|
479f6e40 | 1 | use std::collections::HashMap; |
4b01c983 | 2 | use std::fs::File; |
18c0df4c | 3 | use std::io::{BufRead, BufReader}; |
d3f4c08f | 4 | use std::panic::UnwindSafe; |
18c0df4c WB |
5 | use std::sync::atomic::{AtomicBool, Ordering}; |
6 | use std::sync::{Arc, Mutex}; | |
d3f4c08f | 7 | |
18c0df4c | 8 | use chrono::Local; |
f7d4e4b5 | 9 | use anyhow::{bail, format_err, Error}; |
18c0df4c WB |
10 | use futures::*; |
11 | use lazy_static::lazy_static; | |
619495b2 | 12 | use nix::unistd::Pid; |
321070b4 | 13 | use serde_json::{json, Value}; |
4c116baf | 14 | use serde::{Serialize, Deserialize}; |
18c0df4c | 15 | use tokio::sync::oneshot; |
479f6e40 | 16 | |
619495b2 | 17 | use proxmox::sys::linux::procfs; |
9ea4bce4 | 18 | use proxmox::try_block; |
98c259b4 | 19 | use proxmox::tools::fs::{create_path, open_file_locked, replace_file, CreateOptions}; |
e18a6c9e | 20 | |
634132fe DM |
21 | use super::UPID; |
22 | ||
e18a6c9e | 23 | use crate::tools::FileLogger; |
e7cb4dc5 | 24 | use crate::api2::types::Userid; |
479f6e40 | 25 | |
2ec979e4 | 26 | macro_rules! PROXMOX_BACKUP_VAR_RUN_DIR_M { () => ("/run/proxmox-backup") } |
634132fe DM |
27 | macro_rules! PROXMOX_BACKUP_LOG_DIR_M { () => ("/var/log/proxmox-backup") } |
28 | macro_rules! PROXMOX_BACKUP_TASK_DIR_M { () => (concat!( PROXMOX_BACKUP_LOG_DIR_M!(), "/tasks")) } | |
29 | ||
d607b886 | 30 | pub const PROXMOX_BACKUP_VAR_RUN_DIR: &str = PROXMOX_BACKUP_VAR_RUN_DIR_M!(); |
634132fe DM |
31 | pub const PROXMOX_BACKUP_LOG_DIR: &str = PROXMOX_BACKUP_LOG_DIR_M!(); |
32 | pub const PROXMOX_BACKUP_TASK_DIR: &str = PROXMOX_BACKUP_TASK_DIR_M!(); | |
33 | pub const PROXMOX_BACKUP_TASK_LOCK_FN: &str = concat!(PROXMOX_BACKUP_TASK_DIR_M!(), "/.active.lock"); | |
34 | pub const PROXMOX_BACKUP_ACTIVE_TASK_FN: &str = concat!(PROXMOX_BACKUP_TASK_DIR_M!(), "/active"); | |
479f6e40 DM |
35 | |
36 | lazy_static! { | |
37 | static ref WORKER_TASK_LIST: Mutex<HashMap<usize, Arc<WorkerTask>>> = Mutex::new(HashMap::new()); | |
d607b886 DM |
38 | |
39 | static ref MY_PID: i32 = unsafe { libc::getpid() }; | |
6a0dc4a5 | 40 | static ref MY_PID_PSTART: u64 = procfs::PidStat::read_from_pid(Pid::from_raw(*MY_PID)) |
619495b2 WB |
41 | .unwrap() |
42 | .starttime; | |
479f6e40 DM |
43 | } |
44 | ||
634132fe | 45 | /// Test if the task is still running |
5751e495 DM |
46 | pub async fn worker_is_active(upid: &UPID) -> Result<bool, Error> { |
47 | if (upid.pid == *MY_PID) && (upid.pstart == *MY_PID_PSTART) { | |
48 | return Ok(WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id)); | |
49 | } | |
50 | ||
51 | if !procfs::check_process_running_pstart(upid.pid, upid.pstart).is_some() { | |
52 | return Ok(false); | |
53 | } | |
54 | ||
55 | let socketname = format!( | |
56 | "\0{}/proxmox-task-control-{}.sock", PROXMOX_BACKUP_VAR_RUN_DIR, upid.pid); | |
57 | ||
58 | let cmd = json!({ | |
59 | "command": "status", | |
60 | "upid": upid.to_string(), | |
61 | }); | |
62 | ||
63 | let status = super::send_command(socketname, cmd).await?; | |
4494d078 | 64 | |
5751e495 DM |
65 | if let Some(active) = status.as_bool() { |
66 | Ok(active) | |
67 | } else { | |
68 | bail!("got unexpected result {:?} (expected bool)", status); | |
69 | } | |
70 | } | |
71 | ||
72 | /// Test if the task is still running (fast but inaccurate implementation) | |
73 | /// | |
74 | /// If the task is spanned from a different process, we simply return if | |
75 | /// that process is still running. This information is good enough to detect | |
76 | /// stale tasks... | |
77ebbefc | 77 | pub fn worker_is_active_local(upid: &UPID) -> bool { |
634132fe | 78 | if (upid.pid == *MY_PID) && (upid.pstart == *MY_PID_PSTART) { |
62ee2eb4 | 79 | WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id) |
634132fe | 80 | } else { |
62ee2eb4 | 81 | procfs::check_process_running_pstart(upid.pid, upid.pstart).is_some() |
479f6e40 DM |
82 | } |
83 | } | |
84 | ||
d607b886 DM |
85 | pub fn create_task_control_socket() -> Result<(), Error> { |
86 | ||
87 | let socketname = format!( | |
9b002cbc | 88 | "\0{}/proxmox-task-control-{}.sock", PROXMOX_BACKUP_VAR_RUN_DIR, *MY_PID); |
d607b886 | 89 | |
9b002cbc | 90 | let control_future = super::create_control_socket(socketname, |param| { |
d607b886 | 91 | let param = param.as_object() |
62ee2eb4 | 92 | .ok_or_else(|| format_err!("unable to parse parameters (expected json object)"))?; |
321070b4 | 93 | if param.keys().count() != 2 { bail!("wrong number of parameters"); } |
d607b886 | 94 | |
5751e495 | 95 | let command = param["command"].as_str() |
62ee2eb4 | 96 | .ok_or_else(|| format_err!("unable to parse parameters (missing command)"))?; |
d607b886 | 97 | |
5751e495 DM |
98 | // we have only two commands for now |
99 | if !(command == "abort-task" || command == "status") { bail!("got unknown command '{}'", command); } | |
d607b886 DM |
100 | |
101 | let upid_str = param["upid"].as_str() | |
62ee2eb4 | 102 | .ok_or_else(|| format_err!("unable to parse parameters (missing upid)"))?; |
d607b886 DM |
103 | |
104 | let upid = upid_str.parse::<UPID>()?; | |
105 | ||
106 | if !((upid.pid == *MY_PID) && (upid.pstart == *MY_PID_PSTART)) { | |
107 | bail!("upid does not belong to this process"); | |
108 | } | |
109 | ||
110 | let hash = WORKER_TASK_LIST.lock().unwrap(); | |
5751e495 DM |
111 | |
112 | match command { | |
113 | "abort-task" => { | |
114 | if let Some(ref worker) = hash.get(&upid.task_id) { | |
115 | worker.request_abort(); | |
116 | } else { | |
117 | // assume task is already stopped | |
118 | } | |
119 | Ok(Value::Null) | |
120 | } | |
121 | "status" => { | |
122 | let active = hash.contains_key(&upid.task_id); | |
123 | Ok(active.into()) | |
124 | } | |
125 | _ => { | |
126 | bail!("got unknown command '{}'", command); | |
127 | } | |
d607b886 | 128 | } |
d607b886 DM |
129 | })?; |
130 | ||
131 | tokio::spawn(control_future); | |
132 | ||
133 | Ok(()) | |
134 | } | |
135 | ||
321070b4 | 136 | pub fn abort_worker_async(upid: UPID) { |
75fef4b4 WB |
137 | tokio::spawn(async move { |
138 | if let Err(err) = abort_worker(upid).await { | |
321070b4 DM |
139 | eprintln!("abort worker failed - {}", err); |
140 | } | |
75fef4b4 | 141 | }); |
321070b4 DM |
142 | } |
143 | ||
5751e495 | 144 | pub async fn abort_worker(upid: UPID) -> Result<(), Error> { |
321070b4 DM |
145 | |
146 | let target_pid = upid.pid; | |
147 | ||
148 | let socketname = format!( | |
149 | "\0{}/proxmox-task-control-{}.sock", PROXMOX_BACKUP_VAR_RUN_DIR, target_pid); | |
150 | ||
151 | let cmd = json!({ | |
152 | "command": "abort-task", | |
153 | "upid": upid.to_string(), | |
154 | }); | |
155 | ||
5751e495 | 156 | super::send_command(socketname, cmd).map_ok(|_| ()).await |
321070b4 DM |
157 | } |
158 | ||
4b01c983 DM |
159 | fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<(i64, String)>), Error> { |
160 | ||
161 | let data = line.splitn(3, ' ').collect::<Vec<&str>>(); | |
162 | ||
163 | let len = data.len(); | |
164 | ||
165 | match len { | |
166 | 1 => Ok((data[0].to_owned(), data[0].parse::<UPID>()?, None)), | |
167 | 3 => { | |
168 | let endtime = i64::from_str_radix(data[1], 16)?; | |
169 | Ok((data[0].to_owned(), data[0].parse::<UPID>()?, Some((endtime, data[2].to_owned())))) | |
170 | } | |
171 | _ => bail!("wrong number of components"), | |
172 | } | |
173 | } | |
174 | ||
35950380 | 175 | /// Create task log directory with correct permissions |
d607b886 | 176 | pub fn create_task_log_dirs() -> Result<(), Error> { |
35950380 DM |
177 | |
178 | try_block!({ | |
f74a03da | 179 | let backup_user = crate::backup::backup_user()?; |
35238e23 | 180 | let opts = CreateOptions::new() |
f74a03da DM |
181 | .owner(backup_user.uid) |
182 | .group(backup_user.gid); | |
35950380 | 183 | |
35238e23 WB |
184 | create_path(PROXMOX_BACKUP_LOG_DIR, None, Some(opts.clone()))?; |
185 | create_path(PROXMOX_BACKUP_TASK_DIR, None, Some(opts.clone()))?; | |
186 | create_path(PROXMOX_BACKUP_VAR_RUN_DIR, None, Some(opts))?; | |
35950380 DM |
187 | Ok(()) |
188 | }).map_err(|err: Error| format_err!("unable to create task log dir - {}", err))?; | |
189 | ||
190 | Ok(()) | |
191 | } | |
192 | ||
ae197dda DC |
193 | /// Read endtime (time of last log line) and exitstatus from task log file |
194 | /// If there is not a single line with at valid datetime, we assume the | |
195 | /// starttime to be the endtime | |
196 | pub fn upid_read_status(upid: &UPID) -> Result<(i64, TaskState), Error> { | |
4c116baf | 197 | let mut status = TaskState::Unknown; |
ae197dda | 198 | let mut time = upid.starttime; |
4b01c983 | 199 | |
4494d078 | 200 | let path = upid.log_path(); |
4b01c983 | 201 | |
0bfd87bc DM |
202 | let mut file = File::open(path)?; |
203 | ||
204 | /// speedup - only read tail | |
205 | use std::io::Seek; | |
206 | use std::io::SeekFrom; | |
207 | let _ = file.seek(SeekFrom::End(-8192)); // ignore errors | |
208 | ||
4b01c983 DM |
209 | let reader = BufReader::new(file); |
210 | ||
211 | for line in reader.lines() { | |
212 | let line = line?; | |
213 | ||
ae197dda DC |
214 | let mut iter = line.splitn(2, ": "); |
215 | if let Some(time_str) = iter.next() { | |
216 | time = chrono::DateTime::parse_from_rfc3339(time_str) | |
217 | .map_err(|err| format_err!("cannot parse '{}': {}", time_str, err))? | |
218 | .timestamp(); | |
219 | } else { | |
220 | continue; | |
221 | } | |
222 | match iter.next().and_then(|rest| rest.strip_prefix("TASK ")) { | |
4b01c983 DM |
223 | None => continue, |
224 | Some(rest) => { | |
4c116baf DC |
225 | if let Ok(state) = rest.parse() { |
226 | status = state; | |
4b01c983 DM |
227 | } |
228 | } | |
229 | } | |
230 | } | |
231 | ||
ae197dda | 232 | Ok((time, status)) |
4b01c983 DM |
233 | } |
234 | ||
4c116baf DC |
235 | /// Task State |
236 | #[derive(Debug, PartialEq, Serialize, Deserialize)] | |
237 | pub enum TaskState { | |
238 | /// The Task ended with an undefined state | |
239 | Unknown, | |
240 | /// The Task ended and there were no errors or warnings | |
241 | OK, | |
242 | /// The Task had 'count' amount of warnings and no errors | |
243 | Warning { count: u64 }, | |
244 | /// The Task ended with the error described in 'message' | |
245 | Error { message: String }, | |
246 | } | |
247 | ||
248 | impl TaskState { | |
249 | fn result_text(&self) -> String { | |
250 | match self { | |
251 | TaskState::Error { message } => format!("TASK ERROR: {}", message), | |
252 | other => format!("TASK {}", other), | |
253 | } | |
254 | } | |
255 | } | |
256 | ||
257 | impl std::fmt::Display for TaskState { | |
258 | fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | |
259 | match self { | |
260 | TaskState::Unknown => write!(f, "unknown"), | |
261 | TaskState::OK => write!(f, "OK"), | |
262 | TaskState::Warning { count } => write!(f, "WARNINGS: {}", count), | |
263 | TaskState::Error { message } => write!(f, "{}", message), | |
264 | } | |
265 | } | |
266 | } | |
267 | ||
268 | impl std::str::FromStr for TaskState { | |
269 | type Err = Error; | |
270 | ||
271 | fn from_str(s: &str) -> Result<Self, Self::Err> { | |
272 | if s == "unknown" { | |
273 | Ok(TaskState::Unknown) | |
274 | } else if s == "OK" { | |
275 | Ok(TaskState::OK) | |
276 | } else if s.starts_with("WARNINGS: ") { | |
277 | let count: u64 = s[10..].parse()?; | |
278 | Ok(TaskState::Warning{ count }) | |
279 | } else if s.len() > 0 { | |
280 | let message = if s.starts_with("ERROR: ") { &s[7..] } else { s }.to_string(); | |
281 | Ok(TaskState::Error{ message }) | |
282 | } else { | |
283 | bail!("unable to parse Task Status '{}'", s); | |
284 | } | |
285 | } | |
286 | } | |
287 | ||
93aebb38 DM |
288 | /// Task details including parsed UPID |
289 | /// | |
290 | /// If there is no `state`, the task is still running. | |
291 | #[derive(Debug)] | |
292 | pub struct TaskListInfo { | |
293 | /// The parsed UPID | |
294 | pub upid: UPID, | |
295 | /// UPID string representation | |
296 | pub upid_str: String, | |
297 | /// Task `(endtime, status)` if already finished | |
4c116baf | 298 | pub state: Option<(i64, TaskState)>, // endtime, status |
93aebb38 DM |
299 | } |
300 | ||
301 | // atomically read/update the task list, update status of finished tasks | |
302 | // new_upid is added to the list when specified. | |
303 | // Returns a sorted list of known tasks, | |
304 | fn update_active_workers(new_upid: Option<&UPID>) -> Result<Vec<TaskListInfo>, Error> { | |
4b01c983 | 305 | |
f74a03da | 306 | let backup_user = crate::backup::backup_user()?; |
35950380 | 307 | |
98c259b4 | 308 | let lock = open_file_locked(PROXMOX_BACKUP_TASK_LOCK_FN, std::time::Duration::new(10, 0))?; |
f74a03da | 309 | nix::unistd::chown(PROXMOX_BACKUP_TASK_LOCK_FN, Some(backup_user.uid), Some(backup_user.gid))?; |
4b01c983 | 310 | |
634132fe | 311 | let reader = match File::open(PROXMOX_BACKUP_ACTIVE_TASK_FN) { |
4b01c983 DM |
312 | Ok(f) => Some(BufReader::new(f)), |
313 | Err(err) => { | |
314 | if err.kind() == std::io::ErrorKind::NotFound { | |
315 | None | |
316 | } else { | |
634132fe | 317 | bail!("unable to open active worker {:?} - {}", PROXMOX_BACKUP_ACTIVE_TASK_FN, err); |
4b01c983 DM |
318 | } |
319 | } | |
320 | }; | |
321 | ||
4b01c983 DM |
322 | let mut active_list = vec![]; |
323 | let mut finish_list = vec![]; | |
324 | ||
325 | if let Some(lines) = reader.map(|r| r.lines()) { | |
326 | ||
327 | for line in lines { | |
328 | let line = line?; | |
329 | match parse_worker_status_line(&line) { | |
330 | Err(err) => bail!("unable to parse active worker status '{}' - {}", line, err), | |
fae11693 DC |
331 | Ok((upid_str, upid, state)) => match state { |
332 | None if worker_is_active_local(&upid) => { | |
4b01c983 | 333 | active_list.push(TaskListInfo { upid, upid_str, state: None }); |
fae11693 DC |
334 | }, |
335 | None => { | |
336 | println!("Detected stopped UPID {}", upid_str); | |
ae197dda DC |
337 | let (time, status) = upid_read_status(&upid) |
338 | .unwrap_or_else(|_| (Local::now().timestamp(), TaskState::Unknown)); | |
fae11693 | 339 | finish_list.push(TaskListInfo { |
ae197dda | 340 | upid, upid_str, state: Some((time, status)) |
fae11693 DC |
341 | }); |
342 | }, | |
343 | Some((endtime, status)) => { | |
344 | finish_list.push(TaskListInfo { | |
4c116baf | 345 | upid, upid_str, state: Some((endtime, status.parse()?)) |
fae11693 | 346 | }) |
4b01c983 DM |
347 | } |
348 | } | |
349 | } | |
350 | } | |
351 | } | |
352 | ||
353 | if let Some(upid) = new_upid { | |
354 | active_list.push(TaskListInfo { upid: upid.clone(), upid_str: upid.to_string(), state: None }); | |
355 | } | |
356 | ||
357 | // assemble list without duplicates | |
358 | // we include all active tasks, | |
359 | // and fill up to 1000 entries with finished tasks | |
360 | ||
361 | let max = 1000; | |
362 | ||
363 | let mut task_hash = HashMap::new(); | |
364 | ||
365 | for info in active_list { | |
366 | task_hash.insert(info.upid_str.clone(), info); | |
367 | } | |
368 | ||
369 | for info in finish_list { | |
370 | if task_hash.len() > max { break; } | |
371 | if !task_hash.contains_key(&info.upid_str) { | |
372 | task_hash.insert(info.upid_str.clone(), info); | |
373 | } | |
374 | } | |
375 | ||
93aebb38 DM |
376 | let mut task_list: Vec<TaskListInfo> = vec![]; |
377 | for (_, info) in task_hash { task_list.push(info); } | |
378 | ||
ba70040d | 379 | task_list.sort_unstable_by(|b, a| { // lastest on top |
4b01c983 DM |
380 | match (&a.state, &b.state) { |
381 | (Some(s1), Some(s2)) => s1.0.cmp(&s2.0), | |
382 | (Some(_), None) => std::cmp::Ordering::Less, | |
383 | (None, Some(_)) => std::cmp::Ordering::Greater, | |
384 | _ => a.upid.starttime.cmp(&b.upid.starttime), | |
385 | } | |
386 | }); | |
387 | ||
388 | let mut raw = String::new(); | |
389 | for info in &task_list { | |
390 | if let Some((endtime, status)) = &info.state { | |
391 | raw.push_str(&format!("{} {:08X} {}\n", info.upid_str, endtime, status)); | |
392 | } else { | |
393 | raw.push_str(&info.upid_str); | |
394 | raw.push('\n'); | |
395 | } | |
396 | } | |
397 | ||
feaa1ad3 WB |
398 | replace_file( |
399 | PROXMOX_BACKUP_ACTIVE_TASK_FN, | |
400 | raw.as_bytes(), | |
401 | CreateOptions::new() | |
f74a03da DM |
402 | .owner(backup_user.uid) |
403 | .group(backup_user.gid), | |
feaa1ad3 | 404 | )?; |
4b01c983 DM |
405 | |
406 | drop(lock); | |
407 | ||
93aebb38 | 408 | Ok(task_list) |
4b01c983 DM |
409 | } |
410 | ||
93aebb38 DM |
411 | /// Returns a sorted list of known tasks |
412 | /// | |
413 | /// The list is sorted by `(starttime, endtime)` in ascending order | |
414 | pub fn read_task_list() -> Result<Vec<TaskListInfo>, Error> { | |
415 | update_active_workers(None) | |
416 | } | |
4b01c983 | 417 | |
882594c5 DM |
418 | /// Launch long running worker tasks. |
419 | /// | |
420 | /// A worker task can either be a whole thread, or a simply tokio | |
421 | /// task/future. Each task can `log()` messages, which are stored | |
422 | /// persistently to files. Task should poll the `abort_requested` | |
423 | /// flag, and stop execution when requested. | |
479f6e40 DM |
424 | #[derive(Debug)] |
425 | pub struct WorkerTask { | |
426 | upid: UPID, | |
427 | data: Mutex<WorkerTaskData>, | |
428 | abort_requested: AtomicBool, | |
429 | } | |
430 | ||
431 | impl std::fmt::Display for WorkerTask { | |
432 | ||
433 | fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { | |
434 | self.upid.fmt(f) | |
435 | } | |
436 | } | |
437 | ||
438 | #[derive(Debug)] | |
439 | struct WorkerTaskData { | |
440 | logger: FileLogger, | |
441 | progress: f64, // 0..1 | |
f6de2c73 | 442 | warn_count: u64, |
75bc49be | 443 | pub abort_listeners: Vec<oneshot::Sender<()>>, |
479f6e40 DM |
444 | } |
445 | ||
446 | impl Drop for WorkerTask { | |
447 | ||
448 | fn drop(&mut self) { | |
449 | println!("unregister worker"); | |
450 | } | |
451 | } | |
452 | ||
453 | impl WorkerTask { | |
454 | ||
e7cb4dc5 | 455 | pub fn new(worker_type: &str, worker_id: Option<String>, userid: Userid, to_stdout: bool) -> Result<Arc<Self>, Error> { |
479f6e40 DM |
456 | println!("register worker"); |
457 | ||
e7cb4dc5 | 458 | let upid = UPID::new(worker_type, worker_id, userid)?; |
634132fe | 459 | let task_id = upid.task_id; |
479f6e40 | 460 | |
634132fe | 461 | let mut path = std::path::PathBuf::from(PROXMOX_BACKUP_TASK_DIR); |
35950380 | 462 | |
479f6e40 DM |
463 | path.push(format!("{:02X}", upid.pstart % 256)); |
464 | ||
f74a03da | 465 | let backup_user = crate::backup::backup_user()?; |
35950380 | 466 | |
f74a03da | 467 | create_path(&path, None, Some(CreateOptions::new().owner(backup_user.uid).group(backup_user.gid)))?; |
479f6e40 DM |
468 | |
469 | path.push(upid.to_string()); | |
470 | ||
471 | println!("FILE: {:?}", path); | |
472 | ||
35950380 | 473 | let logger = FileLogger::new(&path, to_stdout)?; |
f74a03da | 474 | nix::unistd::chown(&path, Some(backup_user.uid), Some(backup_user.gid))?; |
479f6e40 DM |
475 | |
476 | let worker = Arc::new(Self { | |
05d755b2 | 477 | upid: upid.clone(), |
479f6e40 DM |
478 | abort_requested: AtomicBool::new(false), |
479 | data: Mutex::new(WorkerTaskData { | |
480 | logger, | |
481 | progress: 0.0, | |
f6de2c73 | 482 | warn_count: 0, |
75bc49be | 483 | abort_listeners: vec![], |
479f6e40 DM |
484 | }), |
485 | }); | |
486 | ||
05d755b2 DC |
487 | // scope to drop the lock again after inserting |
488 | { | |
489 | let mut hash = WORKER_TASK_LIST.lock().unwrap(); | |
490 | hash.insert(task_id, worker.clone()); | |
491 | super::set_worker_count(hash.len()); | |
492 | } | |
7a630df7 | 493 | |
05d755b2 | 494 | update_active_workers(Some(&upid))?; |
479f6e40 DM |
495 | |
496 | Ok(worker) | |
497 | } | |
498 | ||
882594c5 | 499 | /// Spawn a new tokio task/future. |
660c6846 DM |
500 | pub fn spawn<F, T>( |
501 | worker_type: &str, | |
502 | worker_id: Option<String>, | |
e7cb4dc5 | 503 | userid: Userid, |
660c6846 DM |
504 | to_stdout: bool, |
505 | f: F, | |
506 | ) -> Result<String, Error> | |
479f6e40 | 507 | where F: Send + 'static + FnOnce(Arc<WorkerTask>) -> T, |
75fef4b4 | 508 | T: Send + 'static + Future<Output = Result<(), Error>>, |
479f6e40 | 509 | { |
e7cb4dc5 | 510 | let worker = WorkerTask::new(worker_type, worker_id, userid, to_stdout)?; |
660c6846 | 511 | let upid_str = worker.upid.to_string(); |
75fef4b4 WB |
512 | let f = f(worker.clone()); |
513 | tokio::spawn(async move { | |
514 | let result = f.await; | |
dd8e744f | 515 | worker.log_result(&result); |
75fef4b4 | 516 | }); |
479f6e40 | 517 | |
660c6846 | 518 | Ok(upid_str) |
479f6e40 DM |
519 | } |
520 | ||
882594c5 | 521 | /// Create a new worker thread. |
660c6846 DM |
522 | pub fn new_thread<F>( |
523 | worker_type: &str, | |
524 | worker_id: Option<String>, | |
e7cb4dc5 | 525 | userid: Userid, |
660c6846 DM |
526 | to_stdout: bool, |
527 | f: F, | |
528 | ) -> Result<String, Error> | |
d3f4c08f | 529 | where F: Send + UnwindSafe + 'static + FnOnce(Arc<WorkerTask>) -> Result<(), Error> |
479f6e40 DM |
530 | { |
531 | println!("register worker thread"); | |
532 | ||
e7cb4dc5 | 533 | let worker = WorkerTask::new(worker_type, worker_id, userid, to_stdout)?; |
660c6846 | 534 | let upid_str = worker.upid.to_string(); |
479f6e40 | 535 | |
217170e1 | 536 | let _child = std::thread::Builder::new().name(upid_str.clone()).spawn(move || { |
d3f4c08f DM |
537 | let worker1 = worker.clone(); |
538 | let result = match std::panic::catch_unwind(move || f(worker1)) { | |
539 | Ok(r) => r, | |
540 | Err(panic) => { | |
541 | match panic.downcast::<&str>() { | |
542 | Ok(panic_msg) => { | |
543 | Err(format_err!("worker panicked: {}", panic_msg)) | |
544 | } | |
545 | Err(_) => { | |
546 | Err(format_err!("worker panicked: unknown type.")) | |
547 | } | |
548 | } | |
549 | } | |
550 | }; | |
551 | ||
dd8e744f | 552 | worker.log_result(&result); |
479f6e40 DM |
553 | }); |
554 | ||
660c6846 | 555 | Ok(upid_str) |
479f6e40 DM |
556 | } |
557 | ||
4c116baf DC |
558 | /// create state from self and a result |
559 | pub fn create_state(&self, result: &Result<(), Error>) -> TaskState { | |
f6de2c73 | 560 | let warn_count = self.data.lock().unwrap().warn_count; |
cef03f41 | 561 | |
4b01c983 | 562 | if let Err(err) = result { |
4c116baf | 563 | TaskState::Error { message: err.to_string() } |
f6de2c73 | 564 | } else if warn_count > 0 { |
4c116baf | 565 | TaskState::Warning { count: warn_count } |
4b01c983 | 566 | } else { |
4c116baf | 567 | TaskState::OK |
4b01c983 | 568 | } |
cef03f41 DC |
569 | } |
570 | ||
571 | /// Log task result, remove task from running list | |
572 | pub fn log_result(&self, result: &Result<(), Error>) { | |
4c116baf DC |
573 | let state = self.create_state(result); |
574 | self.log(state.result_text()); | |
418def7a DM |
575 | |
576 | WORKER_TASK_LIST.lock().unwrap().remove(&self.upid.task_id); | |
577 | let _ = update_active_workers(None); | |
578 | super::set_worker_count(WORKER_TASK_LIST.lock().unwrap().len()); | |
4b01c983 DM |
579 | } |
580 | ||
882594c5 | 581 | /// Log a message. |
479f6e40 DM |
582 | pub fn log<S: AsRef<str>>(&self, msg: S) { |
583 | let mut data = self.data.lock().unwrap(); | |
584 | data.logger.log(msg); | |
585 | } | |
586 | ||
f6de2c73 DC |
587 | /// Log a message as warning. |
588 | pub fn warn<S: AsRef<str>>(&self, msg: S) { | |
589 | let mut data = self.data.lock().unwrap(); | |
590 | data.logger.log(format!("WARN: {}", msg.as_ref())); | |
591 | data.warn_count += 1; | |
592 | } | |
593 | ||
882594c5 | 594 | /// Set progress indicator |
479f6e40 DM |
595 | pub fn progress(&self, progress: f64) { |
596 | if progress >= 0.0 && progress <= 1.0 { | |
597 | let mut data = self.data.lock().unwrap(); | |
598 | data.progress = progress; | |
599 | } else { | |
600 | // fixme: log!("task '{}': ignoring strange value for progress '{}'", self.upid, progress); | |
601 | } | |
602 | } | |
603 | ||
882594c5 | 604 | /// Request abort |
d607b886 | 605 | pub fn request_abort(&self) { |
98a181f0 | 606 | eprintln!("set abort flag for worker {}", self.upid); |
479f6e40 | 607 | self.abort_requested.store(true, Ordering::SeqCst); |
75bc49be DM |
608 | // noitify listeners |
609 | let mut data = self.data.lock().unwrap(); | |
610 | loop { | |
611 | match data.abort_listeners.pop() { | |
612 | None => { break; }, | |
613 | Some(ch) => { | |
614 | let _ = ch.send(()); // ignore erros here | |
615 | }, | |
616 | } | |
617 | } | |
479f6e40 DM |
618 | } |
619 | ||
882594c5 | 620 | /// Test if abort was requested. |
479f6e40 DM |
621 | pub fn abort_requested(&self) -> bool { |
622 | self.abort_requested.load(Ordering::SeqCst) | |
623 | } | |
624 | ||
882594c5 | 625 | /// Fail if abort was requested. |
479f6e40 DM |
626 | pub fn fail_on_abort(&self) -> Result<(), Error> { |
627 | if self.abort_requested() { | |
99641a6b | 628 | bail!("abort requested - aborting task"); |
479f6e40 DM |
629 | } |
630 | Ok(()) | |
631 | } | |
75bc49be DM |
632 | |
633 | /// Get a future which resolves on task abort | |
634 | pub fn abort_future(&self) -> oneshot::Receiver<()> { | |
635 | let (tx, rx) = oneshot::channel::<()>(); | |
636 | ||
637 | let mut data = self.data.lock().unwrap(); | |
638 | if self.abort_requested() { | |
639 | let _ = tx.send(()); | |
640 | } else { | |
641 | data.abort_listeners.push(tx); | |
642 | } | |
643 | rx | |
644 | } | |
4bd2a9e4 DC |
645 | |
646 | pub fn upid(&self) -> &UPID { | |
647 | &self.upid | |
648 | } | |
479f6e40 | 649 | } |