]> git.proxmox.com Git - proxmox-backup.git/blobdiff - src/server/command_socket.rs
refactor send_command
[proxmox-backup.git] / src / server / command_socket.rs
index 6baefe192f0062e95d84accd9f0547fc2dc6f51c..af41dd166e32b21dcef90694f651ea2dc1a13121 100644 (file)
@@ -1,46 +1,55 @@
-use failure::*;
+use anyhow::{bail, format_err, Error};
 
-use futures::*;
+use std::collections::HashMap;
+use std::os::unix::io::AsRawFd;
+use std::path::{PathBuf, Path};
+use std::sync::Arc;
 
+use futures::*;
 use tokio::net::UnixListener;
-
-use std::path::PathBuf;
+use serde::Serialize;
 use serde_json::Value;
-use std::sync::Arc;
-use std::os::unix::io::AsRawFd;
 use nix::sys::socket;
 
 /// Listens on a Unix Socket to handle simple command asynchronously
-pub fn create_control_socket<P, F>(path: P, func: F) -> Result<impl Future<Output = ()>, Error>
+fn create_control_socket<P, F>(path: P, func: F) -> Result<impl Future<Output = ()>, Error>
 where
     P: Into<PathBuf>,
     F: Fn(Value) -> Result<Value, Error> + Send + Sync + 'static,
 {
     let path: PathBuf = path.into();
 
-    let mut socket = UnixListener::bind(&path)?;
+    let backup_user = crate::backup::backup_user()?;
+    let backup_gid = backup_user.gid.as_raw();
+
+    let socket = UnixListener::bind(&path)?;
 
     let func = Arc::new(func);
 
     let control_future = async move {
         loop {
-            let (conn, _addr) = socket
-                .accept()
-                .await
-                .map_err(|err| {
-                    format_err!("failed to accept on control socket {:?}: {}", path, err)
-                })?;
-
-            // check permissions (same gid, or root user)
+            let (conn, _addr) = match socket.accept().await {
+                Ok(data) => data,
+                Err(err) => {
+                    eprintln!("failed to accept on control socket {:?}: {}", path, err);
+                    continue;
+                }
+            };
+
             let opt = socket::sockopt::PeerCredentials {};
-            match socket::getsockopt(conn.as_raw_fd(), opt) {
-                Ok(cred) => {
-                    let mygid = unsafe { libc::getgid() };
-                    if !(cred.uid() == 0 || cred.gid() == mygid) {
-                        bail!("no permissions for {:?}", cred);
-                    }
+            let cred = match socket::getsockopt(conn.as_raw_fd(), opt) {
+                Ok(cred) => cred,
+                Err(err) => {
+                    eprintln!("no permissions - unable to read peer credential - {}", err);
+                    continue;
                 }
-                Err(e) => bail!("no permissions - unable to read peer credential - {}", e),
+            };
+
+            // check permissions (same gid, root user, or backup group)
+            let mygid = unsafe { libc::getgid() };
+            if !(cred.uid() == 0 || cred.gid() == mygid || cred.gid() == backup_gid) {
+                eprintln!("no permissions for {:?}", cred);
+                continue;
             }
 
             let (rx, mut tx) = tokio::io::split(conn);
@@ -94,42 +103,118 @@ where
 }
 
 
-pub fn send_command<P>(
-    path: P,
-    params: Value
-) -> impl Future<Output = Result<Value, Error>>
-    where P: Into<PathBuf>,
+pub async fn send_command<P, T>(path: P, params: &T) -> Result<Value, Error>
+where
+    P: AsRef<Path>,
+    T: ?Sized + Serialize,
+{
+    let mut command_string = serde_json::to_string(params)?;
+    command_string.push('\n');
+    send_raw_command(path.as_ref(), &command_string).await
+}
 
+pub async fn send_raw_command<P>(path: P, command_string: &str) -> Result<Value, Error>
+where
+    P: AsRef<Path>,
 {
-    let path: PathBuf = path.into();
+    use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
 
-    tokio::net::UnixStream::connect(path)
+    let mut conn = tokio::net::UnixStream::connect(path)
         .map_err(move |err| format_err!("control socket connect failed - {}", err))
-        .and_then(move |mut conn| {
+        .await?;
+
+    conn.write_all(command_string.as_bytes()).await?;
+    if !command_string.as_bytes().ends_with(b"\n") {
+        conn.write_all(b"\n").await?;
+    }
+
+    AsyncWriteExt::shutdown(&mut conn).await?;
+    let mut rx = tokio::io::BufReader::new(conn);
+    let mut data = String::new();
+    if rx.read_line(&mut data).await? == 0 {
+        bail!("no response");
+    }
+    if let Some(res) = data.strip_prefix("OK: ") {
+        match res.parse::<Value>() {
+            Ok(v) => Ok(v),
+            Err(err) => bail!("unable to parse json response - {}", err),
+        }
+    } else if let Some(err) = data.strip_prefix("ERROR: ") {
+        bail!("{}", err);
+    } else {
+        bail!("unable to parse response: {}", data);
+    }
+}
 
-            let mut command_string = params.to_string();
-            command_string.push('\n');
+/// A callback for a specific commando socket.
+pub type CommandoSocketFn = Box<(dyn Fn(Option<&Value>) -> Result<Value, Error> + Send + Sync + 'static)>;
 
-            async move {
-                use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
+/// Tooling to get a single control command socket where one can register multiple commands
+/// dynamically.
+/// You need to call `spawn()` to make the socket active.
+pub struct CommandoSocket {
+    socket: PathBuf,
+    commands: HashMap<String, CommandoSocketFn>,
+}
 
-                conn.write_all(command_string.as_bytes()).await?;
-                AsyncWriteExt::shutdown(&mut conn).await?;
-                let mut rx = tokio::io::BufReader::new(conn);
-                let mut data = String::new();
-                if rx.read_line(&mut data).await? == 0 {
-                    bail!("no response");
-                }
-                if data.starts_with("OK: ") {
-                    match data[4..].parse::<Value>() {
-                        Ok(v) => Ok(v),
-                        Err(err) => bail!("unable to parse json response - {}", err),
-                    }
-                } else if data.starts_with("ERROR: ") {
-                    bail!("{}", &data[7..]);
-                } else {
-                    bail!("unable to parse response: {}", data);
-                }
+impl CommandoSocket {
+    pub fn new<P>(path: P) -> Self
+        where P: Into<PathBuf>,
+    {
+        CommandoSocket {
+            socket: path.into(),
+            commands: HashMap::new(),
+        }
+    }
+
+    /// Spawn the socket and consume self, meaning you cannot register commands anymore after
+    /// calling this.
+    pub fn spawn(self) -> Result<(), Error> {
+        let control_future = create_control_socket(self.socket.to_owned(), move |param| {
+            let param = param
+                .as_object()
+                .ok_or_else(|| format_err!("unable to parse parameters (expected json object)"))?;
+
+            let command = match param.get("command") {
+                Some(Value::String(command)) => command.as_str(),
+                None => bail!("no command"),
+                _ => bail!("unable to parse command"),
+            };
+
+            if !self.commands.contains_key(command) {
+                bail!("got unknown command '{}'", command);
+            }
+
+            match self.commands.get(command) {
+                None => bail!("got unknown command '{}'", command),
+                Some(handler) => {
+                    let args = param.get("args"); //.unwrap_or(&Value::Null);
+                    (handler)(args)
+                },
             }
-        })
+        })?;
+
+        tokio::spawn(control_future);
+
+        Ok(())
+    }
+
+    /// Register a new command with a callback.
+    pub fn register_command<F>(
+        &mut self,
+        command: String,
+        handler: F,
+    ) -> Result<(), Error>
+    where
+        F: Fn(Option<&Value>) -> Result<Value, Error> + Send + Sync + 'static,
+    {
+
+        if self.commands.contains_key(&command) {
+            bail!("command '{}' already exists!", command);
+        }
+
+        self.commands.insert(command, Box::new(handler));
+
+        Ok(())
+    }
 }