]>
Commit | Line | Data |
---|---|---|
479f6e40 | 1 | use std::collections::HashMap; |
4b01c983 | 2 | use std::fs::File; |
18c0df4c | 3 | use std::io::{BufRead, BufReader}; |
d3f4c08f | 4 | use std::panic::UnwindSafe; |
18c0df4c WB |
5 | use std::sync::atomic::{AtomicBool, Ordering}; |
6 | use std::sync::{Arc, Mutex}; | |
d3f4c08f | 7 | |
18c0df4c WB |
8 | use chrono::Local; |
9 | use failure::*; | |
10 | use futures::*; | |
11 | use lazy_static::lazy_static; | |
619495b2 | 12 | use nix::unistd::Pid; |
321070b4 | 13 | use serde_json::{json, Value}; |
18c0df4c | 14 | use tokio::sync::oneshot; |
479f6e40 | 15 | |
619495b2 | 16 | use proxmox::sys::linux::procfs; |
e18a6c9e DM |
17 | use proxmox::tools::{ |
18 | try_block, | |
feaa1ad3 | 19 | fs::{create_path, replace_file, CreateOptions}, |
e18a6c9e DM |
20 | }; |
21 | ||
634132fe DM |
22 | use super::UPID; |
23 | ||
e18a6c9e | 24 | use crate::tools::FileLogger; |
479f6e40 | 25 | |
2ec979e4 | 26 | macro_rules! PROXMOX_BACKUP_VAR_RUN_DIR_M { () => ("/run/proxmox-backup") } |
634132fe DM |
27 | macro_rules! PROXMOX_BACKUP_LOG_DIR_M { () => ("/var/log/proxmox-backup") } |
28 | macro_rules! PROXMOX_BACKUP_TASK_DIR_M { () => (concat!( PROXMOX_BACKUP_LOG_DIR_M!(), "/tasks")) } | |
29 | ||
d607b886 | 30 | pub const PROXMOX_BACKUP_VAR_RUN_DIR: &str = PROXMOX_BACKUP_VAR_RUN_DIR_M!(); |
634132fe DM |
31 | pub const PROXMOX_BACKUP_LOG_DIR: &str = PROXMOX_BACKUP_LOG_DIR_M!(); |
32 | pub const PROXMOX_BACKUP_TASK_DIR: &str = PROXMOX_BACKUP_TASK_DIR_M!(); | |
33 | pub const PROXMOX_BACKUP_TASK_LOCK_FN: &str = concat!(PROXMOX_BACKUP_TASK_DIR_M!(), "/.active.lock"); | |
34 | pub const PROXMOX_BACKUP_ACTIVE_TASK_FN: &str = concat!(PROXMOX_BACKUP_TASK_DIR_M!(), "/active"); | |
479f6e40 DM |
35 | |
36 | lazy_static! { | |
37 | static ref WORKER_TASK_LIST: Mutex<HashMap<usize, Arc<WorkerTask>>> = Mutex::new(HashMap::new()); | |
d607b886 DM |
38 | |
39 | static ref MY_PID: i32 = unsafe { libc::getpid() }; | |
6a0dc4a5 | 40 | static ref MY_PID_PSTART: u64 = procfs::PidStat::read_from_pid(Pid::from_raw(*MY_PID)) |
619495b2 WB |
41 | .unwrap() |
42 | .starttime; | |
479f6e40 DM |
43 | } |
44 | ||
634132fe DM |
45 | /// Test if the task is still running |
46 | pub fn worker_is_active(upid: &UPID) -> bool { | |
4494d078 | 47 | |
634132fe | 48 | if (upid.pid == *MY_PID) && (upid.pstart == *MY_PID_PSTART) { |
62ee2eb4 | 49 | WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id) |
634132fe | 50 | } else { |
62ee2eb4 | 51 | procfs::check_process_running_pstart(upid.pid, upid.pstart).is_some() |
479f6e40 DM |
52 | } |
53 | } | |
54 | ||
d607b886 DM |
55 | pub fn create_task_control_socket() -> Result<(), Error> { |
56 | ||
57 | let socketname = format!( | |
9b002cbc | 58 | "\0{}/proxmox-task-control-{}.sock", PROXMOX_BACKUP_VAR_RUN_DIR, *MY_PID); |
d607b886 | 59 | |
9b002cbc | 60 | let control_future = super::create_control_socket(socketname, |param| { |
d607b886 | 61 | let param = param.as_object() |
62ee2eb4 | 62 | .ok_or_else(|| format_err!("unable to parse parameters (expected json object)"))?; |
321070b4 | 63 | if param.keys().count() != 2 { bail!("wrong number of parameters"); } |
d607b886 DM |
64 | |
65 | let command = param.get("command") | |
62ee2eb4 | 66 | .ok_or_else(|| format_err!("unable to parse parameters (missing command)"))?; |
d607b886 DM |
67 | |
68 | // this is the only command for now | |
69 | if command != "abort-task" { bail!("got unknown command '{}'", command); } | |
70 | ||
71 | let upid_str = param["upid"].as_str() | |
62ee2eb4 | 72 | .ok_or_else(|| format_err!("unable to parse parameters (missing upid)"))?; |
d607b886 DM |
73 | |
74 | let upid = upid_str.parse::<UPID>()?; | |
75 | ||
76 | if !((upid.pid == *MY_PID) && (upid.pstart == *MY_PID_PSTART)) { | |
77 | bail!("upid does not belong to this process"); | |
78 | } | |
79 | ||
80 | let hash = WORKER_TASK_LIST.lock().unwrap(); | |
81 | if let Some(ref worker) = hash.get(&upid.task_id) { | |
82 | worker.request_abort(); | |
83 | } else { | |
84 | // assume task is already stopped | |
85 | } | |
86 | Ok(Value::Null) | |
87 | })?; | |
88 | ||
89 | tokio::spawn(control_future); | |
90 | ||
91 | Ok(()) | |
92 | } | |
93 | ||
321070b4 | 94 | pub fn abort_worker_async(upid: UPID) { |
75fef4b4 WB |
95 | tokio::spawn(async move { |
96 | if let Err(err) = abort_worker(upid).await { | |
321070b4 DM |
97 | eprintln!("abort worker failed - {}", err); |
98 | } | |
75fef4b4 | 99 | }); |
321070b4 DM |
100 | } |
101 | ||
75fef4b4 | 102 | pub fn abort_worker(upid: UPID) -> impl Future<Output = Result<(), Error>> { |
321070b4 DM |
103 | |
104 | let target_pid = upid.pid; | |
105 | ||
106 | let socketname = format!( | |
107 | "\0{}/proxmox-task-control-{}.sock", PROXMOX_BACKUP_VAR_RUN_DIR, target_pid); | |
108 | ||
109 | let cmd = json!({ | |
110 | "command": "abort-task", | |
111 | "upid": upid.to_string(), | |
112 | }); | |
113 | ||
75fef4b4 | 114 | super::send_command(socketname, cmd).map_ok(|_| ()) |
321070b4 DM |
115 | } |
116 | ||
4b01c983 DM |
117 | fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<(i64, String)>), Error> { |
118 | ||
119 | let data = line.splitn(3, ' ').collect::<Vec<&str>>(); | |
120 | ||
121 | let len = data.len(); | |
122 | ||
123 | match len { | |
124 | 1 => Ok((data[0].to_owned(), data[0].parse::<UPID>()?, None)), | |
125 | 3 => { | |
126 | let endtime = i64::from_str_radix(data[1], 16)?; | |
127 | Ok((data[0].to_owned(), data[0].parse::<UPID>()?, Some((endtime, data[2].to_owned())))) | |
128 | } | |
129 | _ => bail!("wrong number of components"), | |
130 | } | |
131 | } | |
132 | ||
35950380 | 133 | /// Create task log directory with correct permissions |
d607b886 | 134 | pub fn create_task_log_dirs() -> Result<(), Error> { |
35950380 DM |
135 | |
136 | try_block!({ | |
f74a03da | 137 | let backup_user = crate::backup::backup_user()?; |
35238e23 | 138 | let opts = CreateOptions::new() |
f74a03da DM |
139 | .owner(backup_user.uid) |
140 | .group(backup_user.gid); | |
35950380 | 141 | |
35238e23 WB |
142 | create_path(PROXMOX_BACKUP_LOG_DIR, None, Some(opts.clone()))?; |
143 | create_path(PROXMOX_BACKUP_TASK_DIR, None, Some(opts.clone()))?; | |
144 | create_path(PROXMOX_BACKUP_VAR_RUN_DIR, None, Some(opts))?; | |
35950380 DM |
145 | Ok(()) |
146 | }).map_err(|err: Error| format_err!("unable to create task log dir - {}", err))?; | |
147 | ||
148 | Ok(()) | |
149 | } | |
150 | ||
0bfd87bc DM |
151 | /// Read exits status from task log file |
152 | pub fn upid_read_status(upid: &UPID) -> Result<String, Error> { | |
4b01c983 DM |
153 | let mut status = String::from("unknown"); |
154 | ||
4494d078 | 155 | let path = upid.log_path(); |
4b01c983 | 156 | |
0bfd87bc DM |
157 | let mut file = File::open(path)?; |
158 | ||
159 | /// speedup - only read tail | |
160 | use std::io::Seek; | |
161 | use std::io::SeekFrom; | |
162 | let _ = file.seek(SeekFrom::End(-8192)); // ignore errors | |
163 | ||
4b01c983 DM |
164 | let reader = BufReader::new(file); |
165 | ||
166 | for line in reader.lines() { | |
167 | let line = line?; | |
168 | ||
169 | let mut iter = line.splitn(2, ": TASK "); | |
170 | if iter.next() == None { continue; } | |
171 | match iter.next() { | |
172 | None => continue, | |
173 | Some(rest) => { | |
174 | if rest == "OK" { | |
175 | status = String::from(rest); | |
176 | } else if rest.starts_with("ERROR: ") { | |
1be71fb0 | 177 | status = String::from(&rest[7..]); |
4b01c983 DM |
178 | } |
179 | } | |
180 | } | |
181 | } | |
182 | ||
183 | Ok(status) | |
184 | } | |
185 | ||
93aebb38 DM |
186 | /// Task details including parsed UPID |
187 | /// | |
188 | /// If there is no `state`, the task is still running. | |
189 | #[derive(Debug)] | |
190 | pub struct TaskListInfo { | |
191 | /// The parsed UPID | |
192 | pub upid: UPID, | |
193 | /// UPID string representation | |
194 | pub upid_str: String, | |
195 | /// Task `(endtime, status)` if already finished | |
196 | /// | |
197 | /// The `status` ise iether `unknown`, `OK`, or `ERROR: ...` | |
198 | pub state: Option<(i64, String)>, // endtime, status | |
199 | } | |
200 | ||
201 | // atomically read/update the task list, update status of finished tasks | |
202 | // new_upid is added to the list when specified. | |
203 | // Returns a sorted list of known tasks, | |
204 | fn update_active_workers(new_upid: Option<&UPID>) -> Result<Vec<TaskListInfo>, Error> { | |
4b01c983 | 205 | |
f74a03da | 206 | let backup_user = crate::backup::backup_user()?; |
35950380 | 207 | |
e18a6c9e | 208 | let lock = crate::tools::open_file_locked(PROXMOX_BACKUP_TASK_LOCK_FN, std::time::Duration::new(10, 0))?; |
f74a03da | 209 | nix::unistd::chown(PROXMOX_BACKUP_TASK_LOCK_FN, Some(backup_user.uid), Some(backup_user.gid))?; |
4b01c983 | 210 | |
634132fe | 211 | let reader = match File::open(PROXMOX_BACKUP_ACTIVE_TASK_FN) { |
4b01c983 DM |
212 | Ok(f) => Some(BufReader::new(f)), |
213 | Err(err) => { | |
214 | if err.kind() == std::io::ErrorKind::NotFound { | |
215 | None | |
216 | } else { | |
634132fe | 217 | bail!("unable to open active worker {:?} - {}", PROXMOX_BACKUP_ACTIVE_TASK_FN, err); |
4b01c983 DM |
218 | } |
219 | } | |
220 | }; | |
221 | ||
4b01c983 DM |
222 | let mut active_list = vec![]; |
223 | let mut finish_list = vec![]; | |
224 | ||
225 | if let Some(lines) = reader.map(|r| r.lines()) { | |
226 | ||
227 | for line in lines { | |
228 | let line = line?; | |
229 | match parse_worker_status_line(&line) { | |
230 | Err(err) => bail!("unable to parse active worker status '{}' - {}", line, err), | |
231 | Ok((upid_str, upid, state)) => { | |
232 | ||
634132fe | 233 | let running = worker_is_active(&upid); |
4b01c983 DM |
234 | |
235 | if running { | |
236 | active_list.push(TaskListInfo { upid, upid_str, state: None }); | |
237 | } else { | |
238 | match state { | |
239 | None => { | |
240 | println!("Detected stoped UPID {}", upid_str); | |
62ee2eb4 DM |
241 | let status = upid_read_status(&upid) |
242 | .unwrap_or_else(|_| String::from("unknown")); | |
4b01c983 DM |
243 | finish_list.push(TaskListInfo { |
244 | upid, upid_str, state: Some((Local::now().timestamp(), status)) | |
245 | }); | |
246 | } | |
247 | Some((endtime, status)) => { | |
248 | finish_list.push(TaskListInfo { | |
249 | upid, upid_str, state: Some((endtime, status)) | |
250 | }) | |
251 | } | |
252 | } | |
253 | } | |
254 | } | |
255 | } | |
256 | } | |
257 | } | |
258 | ||
259 | if let Some(upid) = new_upid { | |
260 | active_list.push(TaskListInfo { upid: upid.clone(), upid_str: upid.to_string(), state: None }); | |
261 | } | |
262 | ||
263 | // assemble list without duplicates | |
264 | // we include all active tasks, | |
265 | // and fill up to 1000 entries with finished tasks | |
266 | ||
267 | let max = 1000; | |
268 | ||
269 | let mut task_hash = HashMap::new(); | |
270 | ||
271 | for info in active_list { | |
272 | task_hash.insert(info.upid_str.clone(), info); | |
273 | } | |
274 | ||
275 | for info in finish_list { | |
276 | if task_hash.len() > max { break; } | |
277 | if !task_hash.contains_key(&info.upid_str) { | |
278 | task_hash.insert(info.upid_str.clone(), info); | |
279 | } | |
280 | } | |
281 | ||
93aebb38 DM |
282 | let mut task_list: Vec<TaskListInfo> = vec![]; |
283 | for (_, info) in task_hash { task_list.push(info); } | |
284 | ||
ba70040d | 285 | task_list.sort_unstable_by(|b, a| { // lastest on top |
4b01c983 DM |
286 | match (&a.state, &b.state) { |
287 | (Some(s1), Some(s2)) => s1.0.cmp(&s2.0), | |
288 | (Some(_), None) => std::cmp::Ordering::Less, | |
289 | (None, Some(_)) => std::cmp::Ordering::Greater, | |
290 | _ => a.upid.starttime.cmp(&b.upid.starttime), | |
291 | } | |
292 | }); | |
293 | ||
294 | let mut raw = String::new(); | |
295 | for info in &task_list { | |
296 | if let Some((endtime, status)) = &info.state { | |
297 | raw.push_str(&format!("{} {:08X} {}\n", info.upid_str, endtime, status)); | |
298 | } else { | |
299 | raw.push_str(&info.upid_str); | |
300 | raw.push('\n'); | |
301 | } | |
302 | } | |
303 | ||
feaa1ad3 WB |
304 | replace_file( |
305 | PROXMOX_BACKUP_ACTIVE_TASK_FN, | |
306 | raw.as_bytes(), | |
307 | CreateOptions::new() | |
f74a03da DM |
308 | .owner(backup_user.uid) |
309 | .group(backup_user.gid), | |
feaa1ad3 | 310 | )?; |
4b01c983 DM |
311 | |
312 | drop(lock); | |
313 | ||
93aebb38 | 314 | Ok(task_list) |
4b01c983 DM |
315 | } |
316 | ||
93aebb38 DM |
317 | /// Returns a sorted list of known tasks |
318 | /// | |
319 | /// The list is sorted by `(starttime, endtime)` in ascending order | |
320 | pub fn read_task_list() -> Result<Vec<TaskListInfo>, Error> { | |
321 | update_active_workers(None) | |
322 | } | |
4b01c983 | 323 | |
882594c5 DM |
324 | /// Launch long running worker tasks. |
325 | /// | |
326 | /// A worker task can either be a whole thread, or a simply tokio | |
327 | /// task/future. Each task can `log()` messages, which are stored | |
328 | /// persistently to files. Task should poll the `abort_requested` | |
329 | /// flag, and stop execution when requested. | |
479f6e40 DM |
330 | #[derive(Debug)] |
331 | pub struct WorkerTask { | |
332 | upid: UPID, | |
333 | data: Mutex<WorkerTaskData>, | |
334 | abort_requested: AtomicBool, | |
335 | } | |
336 | ||
337 | impl std::fmt::Display for WorkerTask { | |
338 | ||
339 | fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { | |
340 | self.upid.fmt(f) | |
341 | } | |
342 | } | |
343 | ||
344 | #[derive(Debug)] | |
345 | struct WorkerTaskData { | |
346 | logger: FileLogger, | |
347 | progress: f64, // 0..1 | |
75bc49be | 348 | pub abort_listeners: Vec<oneshot::Sender<()>>, |
479f6e40 DM |
349 | } |
350 | ||
351 | impl Drop for WorkerTask { | |
352 | ||
353 | fn drop(&mut self) { | |
354 | println!("unregister worker"); | |
355 | } | |
356 | } | |
357 | ||
358 | impl WorkerTask { | |
359 | ||
418def7a | 360 | pub fn new(worker_type: &str, worker_id: Option<String>, username: &str, to_stdout: bool) -> Result<Arc<Self>, Error> { |
479f6e40 DM |
361 | println!("register worker"); |
362 | ||
634132fe DM |
363 | let upid = UPID::new(worker_type, worker_id, username)?; |
364 | let task_id = upid.task_id; | |
479f6e40 | 365 | |
634132fe | 366 | let mut path = std::path::PathBuf::from(PROXMOX_BACKUP_TASK_DIR); |
35950380 | 367 | |
479f6e40 DM |
368 | path.push(format!("{:02X}", upid.pstart % 256)); |
369 | ||
f74a03da | 370 | let backup_user = crate::backup::backup_user()?; |
35950380 | 371 | |
f74a03da | 372 | create_path(&path, None, Some(CreateOptions::new().owner(backup_user.uid).group(backup_user.gid)))?; |
479f6e40 DM |
373 | |
374 | path.push(upid.to_string()); | |
375 | ||
376 | println!("FILE: {:?}", path); | |
377 | ||
35950380 | 378 | let logger = FileLogger::new(&path, to_stdout)?; |
f74a03da | 379 | nix::unistd::chown(&path, Some(backup_user.uid), Some(backup_user.gid))?; |
479f6e40 | 380 | |
4b01c983 DM |
381 | update_active_workers(Some(&upid))?; |
382 | ||
479f6e40 | 383 | let worker = Arc::new(Self { |
653b1ca1 | 384 | upid, |
479f6e40 DM |
385 | abort_requested: AtomicBool::new(false), |
386 | data: Mutex::new(WorkerTaskData { | |
387 | logger, | |
388 | progress: 0.0, | |
75bc49be | 389 | abort_listeners: vec![], |
479f6e40 DM |
390 | }), |
391 | }); | |
392 | ||
7a630df7 DM |
393 | let mut hash = WORKER_TASK_LIST.lock().unwrap(); |
394 | ||
395 | hash.insert(task_id, worker.clone()); | |
396 | super::set_worker_count(hash.len()); | |
479f6e40 DM |
397 | |
398 | Ok(worker) | |
399 | } | |
400 | ||
882594c5 | 401 | /// Spawn a new tokio task/future. |
660c6846 DM |
402 | pub fn spawn<F, T>( |
403 | worker_type: &str, | |
404 | worker_id: Option<String>, | |
405 | username: &str, | |
406 | to_stdout: bool, | |
407 | f: F, | |
408 | ) -> Result<String, Error> | |
479f6e40 | 409 | where F: Send + 'static + FnOnce(Arc<WorkerTask>) -> T, |
75fef4b4 | 410 | T: Send + 'static + Future<Output = Result<(), Error>>, |
479f6e40 DM |
411 | { |
412 | let worker = WorkerTask::new(worker_type, worker_id, username, to_stdout)?; | |
660c6846 | 413 | let upid_str = worker.upid.to_string(); |
75fef4b4 WB |
414 | let f = f(worker.clone()); |
415 | tokio::spawn(async move { | |
416 | let result = f.await; | |
dd8e744f | 417 | worker.log_result(&result); |
75fef4b4 | 418 | }); |
479f6e40 | 419 | |
660c6846 | 420 | Ok(upid_str) |
479f6e40 DM |
421 | } |
422 | ||
882594c5 | 423 | /// Create a new worker thread. |
660c6846 DM |
424 | pub fn new_thread<F>( |
425 | worker_type: &str, | |
426 | worker_id: Option<String>, | |
427 | username: &str, | |
428 | to_stdout: bool, | |
429 | f: F, | |
430 | ) -> Result<String, Error> | |
d3f4c08f | 431 | where F: Send + UnwindSafe + 'static + FnOnce(Arc<WorkerTask>) -> Result<(), Error> |
479f6e40 DM |
432 | { |
433 | println!("register worker thread"); | |
434 | ||
435 | let (p, c) = oneshot::channel::<()>(); | |
436 | ||
437 | let worker = WorkerTask::new(worker_type, worker_id, username, to_stdout)?; | |
660c6846 | 438 | let upid_str = worker.upid.to_string(); |
479f6e40 | 439 | |
217170e1 | 440 | let _child = std::thread::Builder::new().name(upid_str.clone()).spawn(move || { |
d3f4c08f DM |
441 | let worker1 = worker.clone(); |
442 | let result = match std::panic::catch_unwind(move || f(worker1)) { | |
443 | Ok(r) => r, | |
444 | Err(panic) => { | |
445 | match panic.downcast::<&str>() { | |
446 | Ok(panic_msg) => { | |
447 | Err(format_err!("worker panicked: {}", panic_msg)) | |
448 | } | |
449 | Err(_) => { | |
450 | Err(format_err!("worker panicked: unknown type.")) | |
451 | } | |
452 | } | |
453 | } | |
454 | }; | |
455 | ||
dd8e744f | 456 | worker.log_result(&result); |
479f6e40 DM |
457 | p.send(()).unwrap(); |
458 | }); | |
459 | ||
75fef4b4 | 460 | tokio::spawn(c.map(|_| ())); |
479f6e40 | 461 | |
660c6846 | 462 | Ok(upid_str) |
479f6e40 DM |
463 | } |
464 | ||
418def7a | 465 | /// Log task result, remove task from running list |
dd8e744f | 466 | pub fn log_result(&self, result: &Result<(), Error>) { |
418def7a | 467 | |
4b01c983 DM |
468 | if let Err(err) = result { |
469 | self.log(&format!("TASK ERROR: {}", err)); | |
470 | } else { | |
471 | self.log("TASK OK"); | |
472 | } | |
418def7a DM |
473 | |
474 | WORKER_TASK_LIST.lock().unwrap().remove(&self.upid.task_id); | |
475 | let _ = update_active_workers(None); | |
476 | super::set_worker_count(WORKER_TASK_LIST.lock().unwrap().len()); | |
4b01c983 DM |
477 | } |
478 | ||
882594c5 | 479 | /// Log a message. |
479f6e40 DM |
480 | pub fn log<S: AsRef<str>>(&self, msg: S) { |
481 | let mut data = self.data.lock().unwrap(); | |
482 | data.logger.log(msg); | |
483 | } | |
484 | ||
882594c5 | 485 | /// Set progress indicator |
479f6e40 DM |
486 | pub fn progress(&self, progress: f64) { |
487 | if progress >= 0.0 && progress <= 1.0 { | |
488 | let mut data = self.data.lock().unwrap(); | |
489 | data.progress = progress; | |
490 | } else { | |
491 | // fixme: log!("task '{}': ignoring strange value for progress '{}'", self.upid, progress); | |
492 | } | |
493 | } | |
494 | ||
882594c5 | 495 | /// Request abort |
d607b886 | 496 | pub fn request_abort(&self) { |
98a181f0 | 497 | eprintln!("set abort flag for worker {}", self.upid); |
479f6e40 | 498 | self.abort_requested.store(true, Ordering::SeqCst); |
75bc49be DM |
499 | // noitify listeners |
500 | let mut data = self.data.lock().unwrap(); | |
501 | loop { | |
502 | match data.abort_listeners.pop() { | |
503 | None => { break; }, | |
504 | Some(ch) => { | |
505 | let _ = ch.send(()); // ignore erros here | |
506 | }, | |
507 | } | |
508 | } | |
479f6e40 DM |
509 | } |
510 | ||
882594c5 | 511 | /// Test if abort was requested. |
479f6e40 DM |
512 | pub fn abort_requested(&self) -> bool { |
513 | self.abort_requested.load(Ordering::SeqCst) | |
514 | } | |
515 | ||
882594c5 | 516 | /// Fail if abort was requested. |
479f6e40 DM |
517 | pub fn fail_on_abort(&self) -> Result<(), Error> { |
518 | if self.abort_requested() { | |
519 | bail!("task '{}': abort requested - aborting task", self.upid); | |
520 | } | |
521 | Ok(()) | |
522 | } | |
75bc49be DM |
523 | |
524 | /// Get a future which resolves on task abort | |
525 | pub fn abort_future(&self) -> oneshot::Receiver<()> { | |
526 | let (tx, rx) = oneshot::channel::<()>(); | |
527 | ||
528 | let mut data = self.data.lock().unwrap(); | |
529 | if self.abort_requested() { | |
530 | let _ = tx.send(()); | |
531 | } else { | |
532 | data.abort_listeners.push(tx); | |
533 | } | |
534 | rx | |
535 | } | |
479f6e40 | 536 | } |