]> git.proxmox.com Git - proxmox-backup.git/commitdiff
src/server/worker_task.rs: implement abort_worker (via command_socket)
authorDietmar Maurer <dietmar@proxmox.com>
Wed, 10 Apr 2019 10:42:24 +0000 (12:42 +0200)
committerDietmar Maurer <dietmar@proxmox.com>
Wed, 10 Apr 2019 10:42:24 +0000 (12:42 +0200)
src/server/command_socket.rs
src/server/worker_task.rs

index 92a1322cd97a49861bc9a50e659fc49ea71b3881..57a54a3a9c333436140ec0586b9d8536ebb75c8a 100644 (file)
@@ -70,3 +70,52 @@ pub fn create_control_socket<P, F>(path: P, f: F) -> Result<impl Future<Item=(),
 
     Ok(task)
 }
+
+
+pub fn send_command<P>(
+    path: P,
+    params: Value
+) -> impl Future<Item=Value, Error=Error>
+    where P: Into<PathBuf>,
+
+{
+    let path: PathBuf = path.into();
+
+    tokio::net::UnixStream::connect(path)
+        .map_err(move |err| format_err!("control socket connect failed - {}", err))
+        .and_then(move |conn| {
+
+            let (rx, tx) = conn.split();
+
+            let mut command_string = params.to_string();
+            command_string.push('\n');
+
+            tokio::io::write_all(tx, command_string)
+                .and_then(|(tx,_)| tokio::io::shutdown(tx))
+                .map_err(|err| format_err!("control socket write error - {}", err))
+                .and_then(move |_| {
+                    tokio::io::lines(std::io::BufReader::new(rx))
+                        .into_future()
+                        .then(|test| {
+                            match test {
+                                Ok((Some(data), _)) => {
+                                    if data.starts_with("OK: ") {
+                                        match data[4..].parse::<Value>() {
+                                            Ok(v) => Ok(v),
+                                            Err(err) => bail!("unable to parse json response - {}", err),
+                                        }
+                                    } else if data.starts_with("ERROR: ") {
+                                        bail!("{}", &data[7..]);
+                                    } else {
+                                        bail!("unable to parse response: {}", data);
+                                    }
+                                }
+                                Ok((None, _)) => {
+                                    bail!("no response");
+                                }
+                                Err((err, _)) => Err(Error::from(err)),
+                            }
+                        })
+                })
+        })
+}
index fd8a144000f6df1e45ede330a1eec1109b31735c..6140a463b21ac1d6d906a1e7c55ed3db6d1a67f2 100644 (file)
@@ -9,7 +9,7 @@ use std::collections::HashMap;
 use std::sync::atomic::{AtomicBool, Ordering};
 use std::io::{BufRead, BufReader};
 use std::fs::File;
-use serde_json::Value;
+use serde_json::{json, Value};
 
 use super::UPID;
 
@@ -57,7 +57,7 @@ pub fn create_task_control_socket() -> Result<(), Error> {
     let control_future = super::create_control_socket(socketname, |param| {
         let param = param.as_object()
             .ok_or(format_err!("unable to parse parameters (expected json object)"))?;
-        if param.keys().count() != 2 { bail!("worng number of parameters"); }
+        if param.keys().count() != 2 { bail!("wrong number of parameters"); }
 
         let command = param.get("command")
             .ok_or(format_err!("unable to parse parameters (missing command)"))?;
@@ -88,6 +88,32 @@ pub fn create_task_control_socket() -> Result<(), Error> {
     Ok(())
 }
 
+pub fn abort_worker_async(upid: UPID) {
+    let task = abort_worker(upid);
+
+    tokio::spawn(task.then(|res| {
+        if let Err(err) = res {
+            eprintln!("abort worker failed - {}", err);
+        }
+        Ok(())
+    }));
+}
+
+pub fn abort_worker(upid: UPID) -> impl Future<Item=(), Error=Error> {
+
+    let target_pid = upid.pid;
+
+    let socketname = format!(
+        "\0{}/proxmox-task-control-{}.sock", PROXMOX_BACKUP_VAR_RUN_DIR, target_pid);
+
+    let cmd = json!({
+        "command": "abort-task",
+        "upid": upid.to_string(),
+    });
+
+    super::send_command(socketname, cmd).map(|_| {})
+}
+
 fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<(i64, String)>), Error> {
 
     let data = line.splitn(3, ' ').collect::<Vec<&str>>();