use anyhow::{bail, format_err, Error};
-use futures::*;
+use std::collections::HashMap;
+use std::os::unix::io::AsRawFd;
+use std::path::{PathBuf, Path};
+use std::sync::Arc;
+use futures::*;
use tokio::net::UnixListener;
-
-use std::path::PathBuf;
+use serde::Serialize;
use serde_json::Value;
-use std::sync::Arc;
-use std::os::unix::io::AsRawFd;
use nix::sys::socket;
/// Listens on a Unix Socket to handle simple command asynchronously
-pub fn create_control_socket<P, F>(path: P, func: F) -> Result<impl Future<Output = ()>, Error>
+fn create_control_socket<P, F>(path: P, func: F) -> Result<impl Future<Output = ()>, Error>
where
P: Into<PathBuf>,
F: Fn(Value) -> Result<Value, Error> + Send + Sync + 'static,
{
let path: PathBuf = path.into();
- let mut socket = UnixListener::bind(&path)?;
+ let backup_user = crate::backup::backup_user()?;
+ let backup_gid = backup_user.gid.as_raw();
+
+ let socket = UnixListener::bind(&path)?;
let func = Arc::new(func);
let control_future = async move {
loop {
- let (conn, _addr) = socket
- .accept()
- .await
- .map_err(|err| {
- format_err!("failed to accept on control socket {:?}: {}", path, err)
- })?;
-
- // check permissions (same gid, or root user)
+ let (conn, _addr) = match socket.accept().await {
+ Ok(data) => data,
+ Err(err) => {
+ eprintln!("failed to accept on control socket {:?}: {}", path, err);
+ continue;
+ }
+ };
+
let opt = socket::sockopt::PeerCredentials {};
- match socket::getsockopt(conn.as_raw_fd(), opt) {
- Ok(cred) => {
- let mygid = unsafe { libc::getgid() };
- if !(cred.uid() == 0 || cred.gid() == mygid) {
- bail!("no permissions for {:?}", cred);
- }
+ let cred = match socket::getsockopt(conn.as_raw_fd(), opt) {
+ Ok(cred) => cred,
+ Err(err) => {
+ eprintln!("no permissions - unable to read peer credential - {}", err);
+ continue;
}
- Err(e) => bail!("no permissions - unable to read peer credential - {}", e),
+ };
+
+ // check permissions (same gid, root user, or backup group)
+ let mygid = unsafe { libc::getgid() };
+ if !(cred.uid() == 0 || cred.gid() == mygid || cred.gid() == backup_gid) {
+ eprintln!("no permissions for {:?}", cred);
+ continue;
}
let (rx, mut tx) = tokio::io::split(conn);
}
-pub fn send_command<P>(
- path: P,
- params: Value
-) -> impl Future<Output = Result<Value, Error>>
- where P: Into<PathBuf>,
+pub async fn send_command<P, T>(path: P, params: &T) -> Result<Value, Error>
+where
+ P: AsRef<Path>,
+ T: ?Sized + Serialize,
+{
+ let mut command_string = serde_json::to_string(params)?;
+ command_string.push('\n');
+ send_raw_command(path.as_ref(), &command_string).await
+}
+pub async fn send_raw_command<P>(path: P, command_string: &str) -> Result<Value, Error>
+where
+ P: AsRef<Path>,
{
- let path: PathBuf = path.into();
+ use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
- tokio::net::UnixStream::connect(path)
+ let mut conn = tokio::net::UnixStream::connect(path)
.map_err(move |err| format_err!("control socket connect failed - {}", err))
- .and_then(move |mut conn| {
+ .await?;
+
+ conn.write_all(command_string.as_bytes()).await?;
+ if !command_string.as_bytes().ends_with(b"\n") {
+ conn.write_all(b"\n").await?;
+ }
+
+ AsyncWriteExt::shutdown(&mut conn).await?;
+ let mut rx = tokio::io::BufReader::new(conn);
+ let mut data = String::new();
+ if rx.read_line(&mut data).await? == 0 {
+ bail!("no response");
+ }
+ if let Some(res) = data.strip_prefix("OK: ") {
+ match res.parse::<Value>() {
+ Ok(v) => Ok(v),
+ Err(err) => bail!("unable to parse json response - {}", err),
+ }
+ } else if let Some(err) = data.strip_prefix("ERROR: ") {
+ bail!("{}", err);
+ } else {
+ bail!("unable to parse response: {}", data);
+ }
+}
- let mut command_string = params.to_string();
- command_string.push('\n');
+/// A callback for a specific commando socket.
+pub type CommandoSocketFn = Box<(dyn Fn(Option<&Value>) -> Result<Value, Error> + Send + Sync + 'static)>;
- async move {
- use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
+/// Tooling to get a single control command socket where one can register multiple commands
+/// dynamically.
+/// You need to call `spawn()` to make the socket active.
+pub struct CommandoSocket {
+ socket: PathBuf,
+ commands: HashMap<String, CommandoSocketFn>,
+}
- conn.write_all(command_string.as_bytes()).await?;
- AsyncWriteExt::shutdown(&mut conn).await?;
- let mut rx = tokio::io::BufReader::new(conn);
- let mut data = String::new();
- if rx.read_line(&mut data).await? == 0 {
- bail!("no response");
- }
- if data.starts_with("OK: ") {
- match data[4..].parse::<Value>() {
- Ok(v) => Ok(v),
- Err(err) => bail!("unable to parse json response - {}", err),
- }
- } else if data.starts_with("ERROR: ") {
- bail!("{}", &data[7..]);
- } else {
- bail!("unable to parse response: {}", data);
- }
+impl CommandoSocket {
+ pub fn new<P>(path: P) -> Self
+ where P: Into<PathBuf>,
+ {
+ CommandoSocket {
+ socket: path.into(),
+ commands: HashMap::new(),
+ }
+ }
+
+ /// Spawn the socket and consume self, meaning you cannot register commands anymore after
+ /// calling this.
+ pub fn spawn(self) -> Result<(), Error> {
+ let control_future = create_control_socket(self.socket.to_owned(), move |param| {
+ let param = param
+ .as_object()
+ .ok_or_else(|| format_err!("unable to parse parameters (expected json object)"))?;
+
+ let command = match param.get("command") {
+ Some(Value::String(command)) => command.as_str(),
+ None => bail!("no command"),
+ _ => bail!("unable to parse command"),
+ };
+
+ if !self.commands.contains_key(command) {
+ bail!("got unknown command '{}'", command);
+ }
+
+ match self.commands.get(command) {
+ None => bail!("got unknown command '{}'", command),
+ Some(handler) => {
+ let args = param.get("args"); //.unwrap_or(&Value::Null);
+ (handler)(args)
+ },
}
- })
+ })?;
+
+ tokio::spawn(control_future);
+
+ Ok(())
+ }
+
+ /// Register a new command with a callback.
+ pub fn register_command<F>(
+ &mut self,
+ command: String,
+ handler: F,
+ ) -> Result<(), Error>
+ where
+ F: Fn(Option<&Value>) -> Result<Value, Error> + Send + Sync + 'static,
+ {
+
+ if self.commands.contains_key(&command) {
+ bail!("command '{}' already exists!", command);
+ }
+
+ self.commands.insert(command, Box::new(handler));
+
+ Ok(())
+ }
}