1 use anyhow
::{bail, format_err, Error}
;
3 use std
::collections
::HashMap
;
4 use std
::os
::unix
::io
::AsRawFd
;
5 use std
::path
::PathBuf
;
9 use tokio
::net
::UnixListener
;
10 use serde_json
::Value
;
13 /// Listens on a Unix Socket to handle simple command asynchronously
14 fn create_control_socket
<P
, F
>(path
: P
, func
: F
) -> Result
<impl Future
<Output
= ()>, Error
>
17 F
: Fn(Value
) -> Result
<Value
, Error
> + Send
+ Sync
+ '
static,
19 let path
: PathBuf
= path
.into();
21 let backup_user
= crate::backup
::backup_user()?
;
22 let backup_gid
= backup_user
.gid
.as_raw();
24 let socket
= UnixListener
::bind(&path
)?
;
26 let func
= Arc
::new(func
);
28 let control_future
= async
move {
30 let (conn
, _addr
) = match socket
.accept().await
{
33 eprintln
!("failed to accept on control socket {:?}: {}", path
, err
);
38 let opt
= socket
::sockopt
::PeerCredentials {}
;
39 let cred
= match socket
::getsockopt(conn
.as_raw_fd(), opt
) {
42 eprintln
!("no permissions - unable to read peer credential - {}", err
);
47 // check permissions (same gid, root user, or backup group)
48 let mygid
= unsafe { libc::getgid() }
;
49 if !(cred
.uid() == 0 || cred
.gid() == mygid
|| cred
.gid() == backup_gid
) {
50 eprintln
!("no permissions for {:?}", cred
);
54 let (rx
, mut tx
) = tokio
::io
::split(conn
);
56 let abort_future
= super::last_worker_future().map(|_
| ());
58 use tokio
::io
::{AsyncBufReadExt, AsyncWriteExt}
;
59 let func
= Arc
::clone(&func
);
60 let path
= path
.clone();
61 tokio
::spawn(futures
::future
::select(
63 let mut rx
= tokio
::io
::BufReader
::new(rx
);
64 let mut line
= String
::new();
67 match rx
.read_line({ line.clear(); &mut line }
).await
{
71 eprintln
!("control socket {:?} read error: {}", path
, err
);
76 let response
= match line
.parse
::<Value
>() {
77 Ok(param
) => match func(param
) {
78 Ok(res
) => format
!("OK: {}\n", res
),
79 Err(err
) => format
!("ERROR: {}\n", err
),
81 Err(err
) => format
!("ERROR: {}\n", err
),
84 if let Err(err
) = tx
.write_all(response
.as_bytes()).await
{
85 eprintln
!("control socket {:?} write response error: {}", path
, err
);
95 let abort_future
= super::last_worker_future().map_err(|_
| {}
);
96 let task
= futures
::future
::select(
99 ).map(|_
: futures
::future
::Either
<(Result
<(), Error
>, _
), _
>| ());
105 pub async
fn send_command
<P
>(
108 ) -> Result
<Value
, Error
>
109 where P
: Into
<PathBuf
>,
111 let path
: PathBuf
= path
.into();
113 tokio
::net
::UnixStream
::connect(path
)
114 .map_err(move |err
| format_err
!("control socket connect failed - {}", err
))
115 .and_then(move |mut conn
| {
117 let mut command_string
= params
.to_string();
118 command_string
.push('
\n'
);
121 use tokio
::io
::{AsyncBufReadExt, AsyncWriteExt}
;
123 conn
.write_all(command_string
.as_bytes()).await?
;
124 AsyncWriteExt
::shutdown(&mut conn
).await?
;
125 let mut rx
= tokio
::io
::BufReader
::new(conn
);
126 let mut data
= String
::new();
127 if rx
.read_line(&mut data
).await?
== 0 {
128 bail
!("no response");
130 if let Some(res
) = data
.strip_prefix("OK: ") {
131 match res
.parse
::<Value
>() {
133 Err(err
) => bail
!("unable to parse json response - {}", err
),
135 } else if let Some(err
) = data
.strip_prefix("ERROR: ") {
138 bail
!("unable to parse response: {}", data
);
144 /// A callback for a specific commando socket.
145 pub type CommandoSocketFn
= Box
<(dyn Fn(Option
<&Value
>) -> Result
<Value
, Error
> + Send
+ Sync
+ '
static)>;
147 /// Tooling to get a single control command socket where one can register multiple commands
149 /// You need to call `spawn()` to make the socket active.
150 pub struct CommandoSocket
{
152 commands
: HashMap
<String
, CommandoSocketFn
>,
155 impl CommandoSocket
{
156 pub fn new
<P
>(path
: P
) -> Self
157 where P
: Into
<PathBuf
>,
161 commands
: HashMap
::new(),
165 /// Spawn the socket and consume self, meaning you cannot register commands anymore after
167 pub fn spawn(self) -> Result
<(), Error
> {
168 let control_future
= create_control_socket(self.socket
.to_owned(), move |param
| {
171 .ok_or_else(|| format_err
!("unable to parse parameters (expected json object)"))?
;
173 let command
= match param
.get("command") {
174 Some(Value
::String(command
)) => command
.as_str(),
175 None
=> bail
!("no command"),
176 _
=> bail
!("unable to parse command"),
179 if !self.commands
.contains_key(command
) {
180 bail
!("got unknown command '{}'", command
);
183 match self.commands
.get(command
) {
184 None
=> bail
!("got unknown command '{}'", command
),
186 let args
= param
.get("args"); //.unwrap_or(&Value::Null);
192 tokio
::spawn(control_future
);
197 /// Register a new command with a callback.
198 pub fn register_command
<F
>(
202 ) -> Result
<(), Error
>
204 F
: Fn(Option
<&Value
>) -> Result
<Value
, Error
> + Send
+ Sync
+ '
static,
207 if self.commands
.contains_key(&command
) {
208 bail
!("command '{}' already exists!", command
);
211 self.commands
.insert(command
, Box
::new(handler
));