]>
Commit | Line | Data |
---|---|---|
479f6e40 DM |
1 | use failure::*; |
2 | use lazy_static::lazy_static; | |
3 | use chrono::Local; | |
4 | ||
5 | use tokio::sync::oneshot; | |
6 | use futures::*; | |
7 | use std::sync::{Arc, Mutex}; | |
8 | use std::collections::HashMap; | |
634132fe | 9 | use std::sync::atomic::{AtomicBool, Ordering}; |
4b01c983 DM |
10 | use std::io::{BufRead, BufReader}; |
11 | use std::fs::File; | |
d3f4c08f DM |
12 | use std::panic::UnwindSafe; |
13 | ||
321070b4 | 14 | use serde_json::{json, Value}; |
479f6e40 | 15 | |
e18a6c9e DM |
16 | use proxmox::tools::{ |
17 | try_block, | |
18 | fs::{create_dir_chown, file_set_contents_full}, | |
19 | }; | |
20 | ||
634132fe DM |
21 | use super::UPID; |
22 | ||
e18a6c9e | 23 | use crate::tools::FileLogger; |
479f6e40 | 24 | |
d607b886 | 25 | macro_rules! PROXMOX_BACKUP_VAR_RUN_DIR_M { () => ("/var/run/proxmox-backup") } |
634132fe DM |
26 | macro_rules! PROXMOX_BACKUP_LOG_DIR_M { () => ("/var/log/proxmox-backup") } |
27 | macro_rules! PROXMOX_BACKUP_TASK_DIR_M { () => (concat!( PROXMOX_BACKUP_LOG_DIR_M!(), "/tasks")) } | |
28 | ||
d607b886 | 29 | pub const PROXMOX_BACKUP_VAR_RUN_DIR: &str = PROXMOX_BACKUP_VAR_RUN_DIR_M!(); |
634132fe DM |
30 | pub const PROXMOX_BACKUP_LOG_DIR: &str = PROXMOX_BACKUP_LOG_DIR_M!(); |
31 | pub const PROXMOX_BACKUP_TASK_DIR: &str = PROXMOX_BACKUP_TASK_DIR_M!(); | |
32 | pub const PROXMOX_BACKUP_TASK_LOCK_FN: &str = concat!(PROXMOX_BACKUP_TASK_DIR_M!(), "/.active.lock"); | |
33 | pub const PROXMOX_BACKUP_ACTIVE_TASK_FN: &str = concat!(PROXMOX_BACKUP_TASK_DIR_M!(), "/active"); | |
479f6e40 DM |
34 | |
35 | lazy_static! { | |
36 | static ref WORKER_TASK_LIST: Mutex<HashMap<usize, Arc<WorkerTask>>> = Mutex::new(HashMap::new()); | |
d607b886 DM |
37 | |
38 | static ref MY_PID: i32 = unsafe { libc::getpid() }; | |
e18a6c9e | 39 | static ref MY_PID_PSTART: u64 = crate::tools::procfs::read_proc_pid_stat(*MY_PID).unwrap().starttime; |
479f6e40 DM |
40 | } |
41 | ||
634132fe DM |
42 | /// Test if the task is still running |
43 | pub fn worker_is_active(upid: &UPID) -> bool { | |
4494d078 | 44 | |
634132fe DM |
45 | if (upid.pid == *MY_PID) && (upid.pstart == *MY_PID_PSTART) { |
46 | if WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id) { | |
47 | true | |
b75b9681 | 48 | } else { |
634132fe | 49 | false |
9a0d0ff7 | 50 | } |
634132fe | 51 | } else { |
e18a6c9e | 52 | match crate::tools::procfs::check_process_running_pstart(upid.pid, upid.pstart) { |
634132fe DM |
53 | Some(_) => true, |
54 | _ => false, | |
9a0d0ff7 | 55 | } |
479f6e40 DM |
56 | } |
57 | } | |
58 | ||
d607b886 DM |
59 | pub fn create_task_control_socket() -> Result<(), Error> { |
60 | ||
61 | let socketname = format!( | |
9b002cbc | 62 | "\0{}/proxmox-task-control-{}.sock", PROXMOX_BACKUP_VAR_RUN_DIR, *MY_PID); |
d607b886 | 63 | |
9b002cbc | 64 | let control_future = super::create_control_socket(socketname, |param| { |
d607b886 DM |
65 | let param = param.as_object() |
66 | .ok_or(format_err!("unable to parse parameters (expected json object)"))?; | |
321070b4 | 67 | if param.keys().count() != 2 { bail!("wrong number of parameters"); } |
d607b886 DM |
68 | |
69 | let command = param.get("command") | |
70 | .ok_or(format_err!("unable to parse parameters (missing command)"))?; | |
71 | ||
72 | // this is the only command for now | |
73 | if command != "abort-task" { bail!("got unknown command '{}'", command); } | |
74 | ||
75 | let upid_str = param["upid"].as_str() | |
76 | .ok_or(format_err!("unable to parse parameters (missing upid)"))?; | |
77 | ||
78 | let upid = upid_str.parse::<UPID>()?; | |
79 | ||
80 | if !((upid.pid == *MY_PID) && (upid.pstart == *MY_PID_PSTART)) { | |
81 | bail!("upid does not belong to this process"); | |
82 | } | |
83 | ||
84 | let hash = WORKER_TASK_LIST.lock().unwrap(); | |
85 | if let Some(ref worker) = hash.get(&upid.task_id) { | |
86 | worker.request_abort(); | |
87 | } else { | |
88 | // assume task is already stopped | |
89 | } | |
90 | Ok(Value::Null) | |
91 | })?; | |
92 | ||
93 | tokio::spawn(control_future); | |
94 | ||
95 | Ok(()) | |
96 | } | |
97 | ||
321070b4 DM |
98 | pub fn abort_worker_async(upid: UPID) { |
99 | let task = abort_worker(upid); | |
100 | ||
101 | tokio::spawn(task.then(|res| { | |
102 | if let Err(err) = res { | |
103 | eprintln!("abort worker failed - {}", err); | |
104 | } | |
105 | Ok(()) | |
106 | })); | |
107 | } | |
108 | ||
109 | pub fn abort_worker(upid: UPID) -> impl Future<Item=(), Error=Error> { | |
110 | ||
111 | let target_pid = upid.pid; | |
112 | ||
113 | let socketname = format!( | |
114 | "\0{}/proxmox-task-control-{}.sock", PROXMOX_BACKUP_VAR_RUN_DIR, target_pid); | |
115 | ||
116 | let cmd = json!({ | |
117 | "command": "abort-task", | |
118 | "upid": upid.to_string(), | |
119 | }); | |
120 | ||
121 | super::send_command(socketname, cmd).map(|_| {}) | |
122 | } | |
123 | ||
4b01c983 DM |
124 | fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<(i64, String)>), Error> { |
125 | ||
126 | let data = line.splitn(3, ' ').collect::<Vec<&str>>(); | |
127 | ||
128 | let len = data.len(); | |
129 | ||
130 | match len { | |
131 | 1 => Ok((data[0].to_owned(), data[0].parse::<UPID>()?, None)), | |
132 | 3 => { | |
133 | let endtime = i64::from_str_radix(data[1], 16)?; | |
134 | Ok((data[0].to_owned(), data[0].parse::<UPID>()?, Some((endtime, data[2].to_owned())))) | |
135 | } | |
136 | _ => bail!("wrong number of components"), | |
137 | } | |
138 | } | |
139 | ||
35950380 | 140 | /// Create task log directory with correct permissions |
d607b886 | 141 | pub fn create_task_log_dirs() -> Result<(), Error> { |
35950380 DM |
142 | |
143 | try_block!({ | |
e18a6c9e | 144 | let (backup_uid, backup_gid) = crate::tools::getpwnam_ugid("backup")?; |
35950380 DM |
145 | let uid = Some(nix::unistd::Uid::from_raw(backup_uid)); |
146 | let gid = Some(nix::unistd::Gid::from_raw(backup_gid)); | |
147 | ||
e18a6c9e DM |
148 | create_dir_chown(PROXMOX_BACKUP_LOG_DIR, None, uid, gid)?; |
149 | create_dir_chown(PROXMOX_BACKUP_TASK_DIR, None, uid, gid)?; | |
150 | create_dir_chown(PROXMOX_BACKUP_VAR_RUN_DIR, None, uid, gid)?; | |
35950380 DM |
151 | Ok(()) |
152 | }).map_err(|err: Error| format_err!("unable to create task log dir - {}", err))?; | |
153 | ||
154 | Ok(()) | |
155 | } | |
156 | ||
0bfd87bc DM |
157 | /// Read exits status from task log file |
158 | pub fn upid_read_status(upid: &UPID) -> Result<String, Error> { | |
4b01c983 DM |
159 | let mut status = String::from("unknown"); |
160 | ||
4494d078 | 161 | let path = upid.log_path(); |
4b01c983 | 162 | |
0bfd87bc DM |
163 | let mut file = File::open(path)?; |
164 | ||
165 | /// speedup - only read tail | |
166 | use std::io::Seek; | |
167 | use std::io::SeekFrom; | |
168 | let _ = file.seek(SeekFrom::End(-8192)); // ignore errors | |
169 | ||
4b01c983 DM |
170 | let reader = BufReader::new(file); |
171 | ||
172 | for line in reader.lines() { | |
173 | let line = line?; | |
174 | ||
175 | let mut iter = line.splitn(2, ": TASK "); | |
176 | if iter.next() == None { continue; } | |
177 | match iter.next() { | |
178 | None => continue, | |
179 | Some(rest) => { | |
180 | if rest == "OK" { | |
181 | status = String::from(rest); | |
182 | } else if rest.starts_with("ERROR: ") { | |
1be71fb0 | 183 | status = String::from(&rest[7..]); |
4b01c983 DM |
184 | } |
185 | } | |
186 | } | |
187 | } | |
188 | ||
189 | Ok(status) | |
190 | } | |
191 | ||
93aebb38 DM |
192 | /// Task details including parsed UPID |
193 | /// | |
194 | /// If there is no `state`, the task is still running. | |
195 | #[derive(Debug)] | |
196 | pub struct TaskListInfo { | |
197 | /// The parsed UPID | |
198 | pub upid: UPID, | |
199 | /// UPID string representation | |
200 | pub upid_str: String, | |
201 | /// Task `(endtime, status)` if already finished | |
202 | /// | |
203 | /// The `status` ise iether `unknown`, `OK`, or `ERROR: ...` | |
204 | pub state: Option<(i64, String)>, // endtime, status | |
205 | } | |
206 | ||
207 | // atomically read/update the task list, update status of finished tasks | |
208 | // new_upid is added to the list when specified. | |
209 | // Returns a sorted list of known tasks, | |
210 | fn update_active_workers(new_upid: Option<&UPID>) -> Result<Vec<TaskListInfo>, Error> { | |
4b01c983 | 211 | |
e18a6c9e | 212 | let (backup_uid, backup_gid) = crate::tools::getpwnam_ugid("backup")?; |
35950380 DM |
213 | let uid = Some(nix::unistd::Uid::from_raw(backup_uid)); |
214 | let gid = Some(nix::unistd::Gid::from_raw(backup_gid)); | |
215 | ||
e18a6c9e | 216 | let lock = crate::tools::open_file_locked(PROXMOX_BACKUP_TASK_LOCK_FN, std::time::Duration::new(10, 0))?; |
634132fe | 217 | nix::unistd::chown(PROXMOX_BACKUP_TASK_LOCK_FN, uid, gid)?; |
4b01c983 | 218 | |
634132fe | 219 | let reader = match File::open(PROXMOX_BACKUP_ACTIVE_TASK_FN) { |
4b01c983 DM |
220 | Ok(f) => Some(BufReader::new(f)), |
221 | Err(err) => { | |
222 | if err.kind() == std::io::ErrorKind::NotFound { | |
223 | None | |
224 | } else { | |
634132fe | 225 | bail!("unable to open active worker {:?} - {}", PROXMOX_BACKUP_ACTIVE_TASK_FN, err); |
4b01c983 DM |
226 | } |
227 | } | |
228 | }; | |
229 | ||
4b01c983 DM |
230 | let mut active_list = vec![]; |
231 | let mut finish_list = vec![]; | |
232 | ||
233 | if let Some(lines) = reader.map(|r| r.lines()) { | |
234 | ||
235 | for line in lines { | |
236 | let line = line?; | |
237 | match parse_worker_status_line(&line) { | |
238 | Err(err) => bail!("unable to parse active worker status '{}' - {}", line, err), | |
239 | Ok((upid_str, upid, state)) => { | |
240 | ||
634132fe | 241 | let running = worker_is_active(&upid); |
4b01c983 DM |
242 | |
243 | if running { | |
244 | active_list.push(TaskListInfo { upid, upid_str, state: None }); | |
245 | } else { | |
246 | match state { | |
247 | None => { | |
248 | println!("Detected stoped UPID {}", upid_str); | |
249 | let status = upid_read_status(&upid).unwrap_or(String::from("unknown")); | |
250 | finish_list.push(TaskListInfo { | |
251 | upid, upid_str, state: Some((Local::now().timestamp(), status)) | |
252 | }); | |
253 | } | |
254 | Some((endtime, status)) => { | |
255 | finish_list.push(TaskListInfo { | |
256 | upid, upid_str, state: Some((endtime, status)) | |
257 | }) | |
258 | } | |
259 | } | |
260 | } | |
261 | } | |
262 | } | |
263 | } | |
264 | } | |
265 | ||
266 | if let Some(upid) = new_upid { | |
267 | active_list.push(TaskListInfo { upid: upid.clone(), upid_str: upid.to_string(), state: None }); | |
268 | } | |
269 | ||
270 | // assemble list without duplicates | |
271 | // we include all active tasks, | |
272 | // and fill up to 1000 entries with finished tasks | |
273 | ||
274 | let max = 1000; | |
275 | ||
276 | let mut task_hash = HashMap::new(); | |
277 | ||
278 | for info in active_list { | |
279 | task_hash.insert(info.upid_str.clone(), info); | |
280 | } | |
281 | ||
282 | for info in finish_list { | |
283 | if task_hash.len() > max { break; } | |
284 | if !task_hash.contains_key(&info.upid_str) { | |
285 | task_hash.insert(info.upid_str.clone(), info); | |
286 | } | |
287 | } | |
288 | ||
93aebb38 DM |
289 | let mut task_list: Vec<TaskListInfo> = vec![]; |
290 | for (_, info) in task_hash { task_list.push(info); } | |
291 | ||
ba70040d | 292 | task_list.sort_unstable_by(|b, a| { // lastest on top |
4b01c983 DM |
293 | match (&a.state, &b.state) { |
294 | (Some(s1), Some(s2)) => s1.0.cmp(&s2.0), | |
295 | (Some(_), None) => std::cmp::Ordering::Less, | |
296 | (None, Some(_)) => std::cmp::Ordering::Greater, | |
297 | _ => a.upid.starttime.cmp(&b.upid.starttime), | |
298 | } | |
299 | }); | |
300 | ||
301 | let mut raw = String::new(); | |
302 | for info in &task_list { | |
303 | if let Some((endtime, status)) = &info.state { | |
304 | raw.push_str(&format!("{} {:08X} {}\n", info.upid_str, endtime, status)); | |
305 | } else { | |
306 | raw.push_str(&info.upid_str); | |
307 | raw.push('\n'); | |
308 | } | |
309 | } | |
310 | ||
e18a6c9e | 311 | file_set_contents_full(PROXMOX_BACKUP_ACTIVE_TASK_FN, raw.as_bytes(), None, uid, gid)?; |
4b01c983 DM |
312 | |
313 | drop(lock); | |
314 | ||
93aebb38 | 315 | Ok(task_list) |
4b01c983 DM |
316 | } |
317 | ||
93aebb38 DM |
318 | /// Returns a sorted list of known tasks |
319 | /// | |
320 | /// The list is sorted by `(starttime, endtime)` in ascending order | |
321 | pub fn read_task_list() -> Result<Vec<TaskListInfo>, Error> { | |
322 | update_active_workers(None) | |
323 | } | |
4b01c983 | 324 | |
882594c5 DM |
325 | /// Launch long running worker tasks. |
326 | /// | |
327 | /// A worker task can either be a whole thread, or a simply tokio | |
328 | /// task/future. Each task can `log()` messages, which are stored | |
329 | /// persistently to files. Task should poll the `abort_requested` | |
330 | /// flag, and stop execution when requested. | |
479f6e40 DM |
331 | #[derive(Debug)] |
332 | pub struct WorkerTask { | |
333 | upid: UPID, | |
334 | data: Mutex<WorkerTaskData>, | |
335 | abort_requested: AtomicBool, | |
336 | } | |
337 | ||
338 | impl std::fmt::Display for WorkerTask { | |
339 | ||
340 | fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { | |
341 | self.upid.fmt(f) | |
342 | } | |
343 | } | |
344 | ||
345 | #[derive(Debug)] | |
346 | struct WorkerTaskData { | |
347 | logger: FileLogger, | |
348 | progress: f64, // 0..1 | |
75bc49be | 349 | pub abort_listeners: Vec<oneshot::Sender<()>>, |
479f6e40 DM |
350 | } |
351 | ||
352 | impl Drop for WorkerTask { | |
353 | ||
354 | fn drop(&mut self) { | |
355 | println!("unregister worker"); | |
356 | } | |
357 | } | |
358 | ||
359 | impl WorkerTask { | |
360 | ||
418def7a | 361 | pub fn new(worker_type: &str, worker_id: Option<String>, username: &str, to_stdout: bool) -> Result<Arc<Self>, Error> { |
479f6e40 DM |
362 | println!("register worker"); |
363 | ||
634132fe DM |
364 | let upid = UPID::new(worker_type, worker_id, username)?; |
365 | let task_id = upid.task_id; | |
479f6e40 | 366 | |
634132fe | 367 | let mut path = std::path::PathBuf::from(PROXMOX_BACKUP_TASK_DIR); |
35950380 | 368 | |
479f6e40 DM |
369 | path.push(format!("{:02X}", upid.pstart % 256)); |
370 | ||
e18a6c9e | 371 | let (backup_uid, backup_gid) = crate::tools::getpwnam_ugid("backup")?; |
35950380 DM |
372 | let uid = Some(nix::unistd::Uid::from_raw(backup_uid)); |
373 | let gid = Some(nix::unistd::Gid::from_raw(backup_gid)); | |
374 | ||
e18a6c9e | 375 | create_dir_chown(&path, None, uid, gid)?; |
479f6e40 DM |
376 | |
377 | path.push(upid.to_string()); | |
378 | ||
379 | println!("FILE: {:?}", path); | |
380 | ||
35950380 DM |
381 | let logger = FileLogger::new(&path, to_stdout)?; |
382 | nix::unistd::chown(&path, uid, gid)?; | |
479f6e40 | 383 | |
4b01c983 DM |
384 | update_active_workers(Some(&upid))?; |
385 | ||
479f6e40 DM |
386 | let worker = Arc::new(Self { |
387 | upid: upid, | |
388 | abort_requested: AtomicBool::new(false), | |
389 | data: Mutex::new(WorkerTaskData { | |
390 | logger, | |
391 | progress: 0.0, | |
75bc49be | 392 | abort_listeners: vec![], |
479f6e40 DM |
393 | }), |
394 | }); | |
395 | ||
7a630df7 DM |
396 | let mut hash = WORKER_TASK_LIST.lock().unwrap(); |
397 | ||
398 | hash.insert(task_id, worker.clone()); | |
399 | super::set_worker_count(hash.len()); | |
479f6e40 DM |
400 | |
401 | Ok(worker) | |
402 | } | |
403 | ||
882594c5 | 404 | /// Spawn a new tokio task/future. |
660c6846 DM |
405 | pub fn spawn<F, T>( |
406 | worker_type: &str, | |
407 | worker_id: Option<String>, | |
408 | username: &str, | |
409 | to_stdout: bool, | |
410 | f: F, | |
411 | ) -> Result<String, Error> | |
479f6e40 | 412 | where F: Send + 'static + FnOnce(Arc<WorkerTask>) -> T, |
4b01c983 | 413 | T: Send + 'static + Future<Item=(), Error=Error>, |
479f6e40 DM |
414 | { |
415 | let worker = WorkerTask::new(worker_type, worker_id, username, to_stdout)?; | |
660c6846 | 416 | let upid_str = worker.upid.to_string(); |
479f6e40 | 417 | |
4b01c983 | 418 | tokio::spawn(f(worker.clone()).then(move |result| { |
dd8e744f | 419 | worker.log_result(&result); |
479f6e40 DM |
420 | Ok(()) |
421 | })); | |
422 | ||
660c6846 | 423 | Ok(upid_str) |
479f6e40 DM |
424 | } |
425 | ||
882594c5 | 426 | /// Create a new worker thread. |
660c6846 DM |
427 | pub fn new_thread<F>( |
428 | worker_type: &str, | |
429 | worker_id: Option<String>, | |
430 | username: &str, | |
431 | to_stdout: bool, | |
432 | f: F, | |
433 | ) -> Result<String, Error> | |
d3f4c08f | 434 | where F: Send + UnwindSafe + 'static + FnOnce(Arc<WorkerTask>) -> Result<(), Error> |
479f6e40 DM |
435 | { |
436 | println!("register worker thread"); | |
437 | ||
438 | let (p, c) = oneshot::channel::<()>(); | |
439 | ||
440 | let worker = WorkerTask::new(worker_type, worker_id, username, to_stdout)?; | |
660c6846 | 441 | let upid_str = worker.upid.to_string(); |
479f6e40 DM |
442 | |
443 | let _child = std::thread::spawn(move || { | |
d3f4c08f DM |
444 | let worker1 = worker.clone(); |
445 | let result = match std::panic::catch_unwind(move || f(worker1)) { | |
446 | Ok(r) => r, | |
447 | Err(panic) => { | |
448 | match panic.downcast::<&str>() { | |
449 | Ok(panic_msg) => { | |
450 | Err(format_err!("worker panicked: {}", panic_msg)) | |
451 | } | |
452 | Err(_) => { | |
453 | Err(format_err!("worker panicked: unknown type.")) | |
454 | } | |
455 | } | |
456 | } | |
457 | }; | |
458 | ||
dd8e744f | 459 | worker.log_result(&result); |
479f6e40 DM |
460 | p.send(()).unwrap(); |
461 | }); | |
462 | ||
463 | tokio::spawn(c.then(|_| Ok(()))); | |
464 | ||
660c6846 | 465 | Ok(upid_str) |
479f6e40 DM |
466 | } |
467 | ||
418def7a | 468 | /// Log task result, remove task from running list |
dd8e744f | 469 | pub fn log_result(&self, result: &Result<(), Error>) { |
418def7a | 470 | |
4b01c983 DM |
471 | if let Err(err) = result { |
472 | self.log(&format!("TASK ERROR: {}", err)); | |
473 | } else { | |
474 | self.log("TASK OK"); | |
475 | } | |
418def7a DM |
476 | |
477 | WORKER_TASK_LIST.lock().unwrap().remove(&self.upid.task_id); | |
478 | let _ = update_active_workers(None); | |
479 | super::set_worker_count(WORKER_TASK_LIST.lock().unwrap().len()); | |
4b01c983 DM |
480 | } |
481 | ||
882594c5 | 482 | /// Log a message. |
479f6e40 DM |
483 | pub fn log<S: AsRef<str>>(&self, msg: S) { |
484 | let mut data = self.data.lock().unwrap(); | |
485 | data.logger.log(msg); | |
486 | } | |
487 | ||
882594c5 | 488 | /// Set progress indicator |
479f6e40 DM |
489 | pub fn progress(&self, progress: f64) { |
490 | if progress >= 0.0 && progress <= 1.0 { | |
491 | let mut data = self.data.lock().unwrap(); | |
492 | data.progress = progress; | |
493 | } else { | |
494 | // fixme: log!("task '{}': ignoring strange value for progress '{}'", self.upid, progress); | |
495 | } | |
496 | } | |
497 | ||
882594c5 | 498 | /// Request abort |
d607b886 | 499 | pub fn request_abort(&self) { |
98a181f0 | 500 | eprintln!("set abort flag for worker {}", self.upid); |
479f6e40 | 501 | self.abort_requested.store(true, Ordering::SeqCst); |
75bc49be DM |
502 | // noitify listeners |
503 | let mut data = self.data.lock().unwrap(); | |
504 | loop { | |
505 | match data.abort_listeners.pop() { | |
506 | None => { break; }, | |
507 | Some(ch) => { | |
508 | let _ = ch.send(()); // ignore erros here | |
509 | }, | |
510 | } | |
511 | } | |
479f6e40 DM |
512 | } |
513 | ||
882594c5 | 514 | /// Test if abort was requested. |
479f6e40 DM |
515 | pub fn abort_requested(&self) -> bool { |
516 | self.abort_requested.load(Ordering::SeqCst) | |
517 | } | |
518 | ||
882594c5 | 519 | /// Fail if abort was requested. |
479f6e40 DM |
520 | pub fn fail_on_abort(&self) -> Result<(), Error> { |
521 | if self.abort_requested() { | |
522 | bail!("task '{}': abort requested - aborting task", self.upid); | |
523 | } | |
524 | Ok(()) | |
525 | } | |
75bc49be DM |
526 | |
527 | /// Get a future which resolves on task abort | |
528 | pub fn abort_future(&self) -> oneshot::Receiver<()> { | |
529 | let (tx, rx) = oneshot::channel::<()>(); | |
530 | ||
531 | let mut data = self.data.lock().unwrap(); | |
532 | if self.abort_requested() { | |
533 | let _ = tx.send(()); | |
534 | } else { | |
535 | data.abort_listeners.push(tx); | |
536 | } | |
537 | rx | |
538 | } | |
479f6e40 | 539 | } |