1 use anyhow
::{bail, format_err, Error}
;
3 use std
::collections
::HashMap
;
4 use std
::os
::unix
::io
::AsRawFd
;
5 use std
::path
::{PathBuf, Path}
;
9 use tokio
::net
::UnixListener
;
11 use serde_json
::Value
;
14 /// Listens on a Unix Socket to handle simple command asynchronously
15 fn create_control_socket
<P
, F
>(path
: P
, func
: F
) -> Result
<impl Future
<Output
= ()>, Error
>
18 F
: Fn(Value
) -> Result
<Value
, Error
> + Send
+ Sync
+ '
static,
20 let path
: PathBuf
= path
.into();
22 let backup_user
= crate::backup
::backup_user()?
;
23 let backup_gid
= backup_user
.gid
.as_raw();
25 let socket
= UnixListener
::bind(&path
)?
;
27 let func
= Arc
::new(func
);
29 let control_future
= async
move {
31 let (conn
, _addr
) = match socket
.accept().await
{
34 eprintln
!("failed to accept on control socket {:?}: {}", path
, err
);
39 let opt
= socket
::sockopt
::PeerCredentials {}
;
40 let cred
= match socket
::getsockopt(conn
.as_raw_fd(), opt
) {
43 eprintln
!("no permissions - unable to read peer credential - {}", err
);
48 // check permissions (same gid, root user, or backup group)
49 let mygid
= unsafe { libc::getgid() }
;
50 if !(cred
.uid() == 0 || cred
.gid() == mygid
|| cred
.gid() == backup_gid
) {
51 eprintln
!("no permissions for {:?}", cred
);
55 let (rx
, mut tx
) = tokio
::io
::split(conn
);
57 let abort_future
= super::last_worker_future().map(|_
| ());
59 use tokio
::io
::{AsyncBufReadExt, AsyncWriteExt}
;
60 let func
= Arc
::clone(&func
);
61 let path
= path
.clone();
62 tokio
::spawn(futures
::future
::select(
64 let mut rx
= tokio
::io
::BufReader
::new(rx
);
65 let mut line
= String
::new();
68 match rx
.read_line({ line.clear(); &mut line }
).await
{
72 eprintln
!("control socket {:?} read error: {}", path
, err
);
77 let response
= match line
.parse
::<Value
>() {
78 Ok(param
) => match func(param
) {
79 Ok(res
) => format
!("OK: {}\n", res
),
80 Err(err
) => format
!("ERROR: {}\n", err
),
82 Err(err
) => format
!("ERROR: {}\n", err
),
85 if let Err(err
) = tx
.write_all(response
.as_bytes()).await
{
86 eprintln
!("control socket {:?} write response error: {}", path
, err
);
96 let abort_future
= super::last_worker_future().map_err(|_
| {}
);
97 let task
= futures
::future
::select(
100 ).map(|_
: futures
::future
::Either
<(Result
<(), Error
>, _
), _
>| ());
106 pub async
fn send_command
<P
, T
>(path
: P
, params
: &T
) -> Result
<Value
, Error
>
109 T
: ?Sized
+ Serialize
,
111 let mut command_string
= serde_json
::to_string(params
)?
;
112 command_string
.push('
\n'
);
113 send_raw_command(path
.as_ref(), &command_string
).await
116 pub async
fn send_raw_command
<P
>(path
: P
, command_string
: &str) -> Result
<Value
, Error
>
120 use tokio
::io
::{AsyncBufReadExt, AsyncWriteExt}
;
122 let mut conn
= tokio
::net
::UnixStream
::connect(path
)
123 .map_err(move |err
| format_err
!("control socket connect failed - {}", err
))
126 conn
.write_all(command_string
.as_bytes()).await?
;
127 if !command_string
.as_bytes().ends_with(b
"\n") {
128 conn
.write_all(b
"\n").await?
;
131 AsyncWriteExt
::shutdown(&mut conn
).await?
;
132 let mut rx
= tokio
::io
::BufReader
::new(conn
);
133 let mut data
= String
::new();
134 if rx
.read_line(&mut data
).await?
== 0 {
135 bail
!("no response");
137 if let Some(res
) = data
.strip_prefix("OK: ") {
138 match res
.parse
::<Value
>() {
140 Err(err
) => bail
!("unable to parse json response - {}", err
),
142 } else if let Some(err
) = data
.strip_prefix("ERROR: ") {
145 bail
!("unable to parse response: {}", data
);
149 /// A callback for a specific commando socket.
150 pub type CommandoSocketFn
= Box
<(dyn Fn(Option
<&Value
>) -> Result
<Value
, Error
> + Send
+ Sync
+ '
static)>;
152 /// Tooling to get a single control command socket where one can register multiple commands
154 /// You need to call `spawn()` to make the socket active.
155 pub struct CommandoSocket
{
157 commands
: HashMap
<String
, CommandoSocketFn
>,
160 impl CommandoSocket
{
161 pub fn new
<P
>(path
: P
) -> Self
162 where P
: Into
<PathBuf
>,
166 commands
: HashMap
::new(),
170 /// Spawn the socket and consume self, meaning you cannot register commands anymore after
172 pub fn spawn(self) -> Result
<(), Error
> {
173 let control_future
= create_control_socket(self.socket
.to_owned(), move |param
| {
176 .ok_or_else(|| format_err
!("unable to parse parameters (expected json object)"))?
;
178 let command
= match param
.get("command") {
179 Some(Value
::String(command
)) => command
.as_str(),
180 None
=> bail
!("no command"),
181 _
=> bail
!("unable to parse command"),
184 if !self.commands
.contains_key(command
) {
185 bail
!("got unknown command '{}'", command
);
188 match self.commands
.get(command
) {
189 None
=> bail
!("got unknown command '{}'", command
),
191 let args
= param
.get("args"); //.unwrap_or(&Value::Null);
197 tokio
::spawn(control_future
);
202 /// Register a new command with a callback.
203 pub fn register_command
<F
>(
207 ) -> Result
<(), Error
>
209 F
: Fn(Option
<&Value
>) -> Result
<Value
, Error
> + Send
+ Sync
+ '
static,
212 if self.commands
.contains_key(&command
) {
213 bail
!("command '{}' already exists!", command
);
216 self.commands
.insert(command
, Box
::new(handler
));