1 use anyhow
::{bail, format_err, Error}
;
3 use std
::collections
::HashMap
;
4 use std
::os
::unix
::io
::AsRawFd
;
5 use std
::path
::{PathBuf, Path}
;
9 use tokio
::net
::UnixListener
;
11 use serde_json
::Value
;
15 // Listens on a Unix Socket to handle simple command asynchronously
16 fn create_control_socket
<P
, F
>(path
: P
, gid
: Gid
, func
: F
) -> Result
<impl Future
<Output
= ()>, Error
>
19 F
: Fn(Value
) -> Result
<Value
, Error
> + Send
+ Sync
+ '
static,
21 let path
: PathBuf
= path
.into();
23 let gid
= gid
.as_raw();
25 let socket
= UnixListener
::bind(&path
)?
;
27 let func
= Arc
::new(func
);
29 let control_future
= async
move {
31 let (conn
, _addr
) = match socket
.accept().await
{
34 eprintln
!("failed to accept on control socket {:?}: {}", path
, err
);
39 let opt
= socket
::sockopt
::PeerCredentials {}
;
40 let cred
= match socket
::getsockopt(conn
.as_raw_fd(), opt
) {
43 eprintln
!("no permissions - unable to read peer credential - {}", err
);
48 // check permissions (same gid, root user, or backup group)
49 let mygid
= unsafe { libc::getgid() }
;
50 if !(cred
.uid() == 0 || cred
.gid() == mygid
|| cred
.gid() == gid
) {
51 eprintln
!("no permissions for {:?}", cred
);
55 let (rx
, mut tx
) = tokio
::io
::split(conn
);
57 let abort_future
= super::last_worker_future().map(|_
| ());
59 use tokio
::io
::{AsyncBufReadExt, AsyncWriteExt}
;
60 let func
= Arc
::clone(&func
);
61 let path
= path
.clone();
62 tokio
::spawn(futures
::future
::select(
64 let mut rx
= tokio
::io
::BufReader
::new(rx
);
65 let mut line
= String
::new();
68 match rx
.read_line({ line.clear(); &mut line }
).await
{
72 eprintln
!("control socket {:?} read error: {}", path
, err
);
77 let response
= match line
.parse
::<Value
>() {
78 Ok(param
) => match func(param
) {
79 Ok(res
) => format
!("OK: {}\n", res
),
80 Err(err
) => format
!("ERROR: {}\n", err
),
82 Err(err
) => format
!("ERROR: {}\n", err
),
85 if let Err(err
) = tx
.write_all(response
.as_bytes()).await
{
86 eprintln
!("control socket {:?} write response error: {}", path
, err
);
96 let abort_future
= crate::last_worker_future().map_err(|_
| {}
);
97 let task
= futures
::future
::select(
100 ).map(|_
: futures
::future
::Either
<(Result
<(), Error
>, _
), _
>| ());
105 /// Send a command to the specified socket
106 pub async
fn send_command
<P
, T
>(path
: P
, params
: &T
) -> Result
<Value
, Error
>
109 T
: ?Sized
+ Serialize
,
111 let mut command_string
= serde_json
::to_string(params
)?
;
112 command_string
.push('
\n'
);
113 send_raw_command(path
.as_ref(), &command_string
).await
116 /// Send a raw command (string) to the specified socket
117 pub async
fn send_raw_command
<P
>(path
: P
, command_string
: &str) -> Result
<Value
, Error
>
121 use tokio
::io
::{AsyncBufReadExt, AsyncWriteExt}
;
123 let mut conn
= tokio
::net
::UnixStream
::connect(path
)
124 .map_err(move |err
| format_err
!("control socket connect failed - {}", err
))
127 conn
.write_all(command_string
.as_bytes()).await?
;
128 if !command_string
.as_bytes().ends_with(b
"\n") {
129 conn
.write_all(b
"\n").await?
;
132 AsyncWriteExt
::shutdown(&mut conn
).await?
;
133 let mut rx
= tokio
::io
::BufReader
::new(conn
);
134 let mut data
= String
::new();
135 if rx
.read_line(&mut data
).await?
== 0 {
136 bail
!("no response");
138 if let Some(res
) = data
.strip_prefix("OK: ") {
139 match res
.parse
::<Value
>() {
141 Err(err
) => bail
!("unable to parse json response - {}", err
),
143 } else if let Some(err
) = data
.strip_prefix("ERROR: ") {
146 bail
!("unable to parse response: {}", data
);
150 // A callback for a specific commando socket.
151 type CommandoSocketFn
= Box
<(dyn Fn(Option
<&Value
>) -> Result
<Value
, Error
> + Send
+ Sync
+ '
static)>;
153 /// Tooling to get a single control command socket where one can
154 /// register multiple commands dynamically.
156 /// You need to call `spawn()` to make the socket active.
157 pub struct CommandoSocket
{
160 commands
: HashMap
<String
, CommandoSocketFn
>,
163 impl CommandoSocket
{
164 pub fn new
<P
>(path
: P
, gid
: Gid
) -> Self
165 where P
: Into
<PathBuf
>,
170 commands
: HashMap
::new(),
174 /// Spawn the socket and consume self, meaning you cannot register commands anymore after
176 pub fn spawn(self) -> Result
<(), Error
> {
177 let control_future
= create_control_socket(self.socket
.to_owned(), self.gid
, move |param
| {
180 .ok_or_else(|| format_err
!("unable to parse parameters (expected json object)"))?
;
182 let command
= match param
.get("command") {
183 Some(Value
::String(command
)) => command
.as_str(),
184 None
=> bail
!("no command"),
185 _
=> bail
!("unable to parse command"),
188 if !self.commands
.contains_key(command
) {
189 bail
!("got unknown command '{}'", command
);
192 match self.commands
.get(command
) {
193 None
=> bail
!("got unknown command '{}'", command
),
195 let args
= param
.get("args"); //.unwrap_or(&Value::Null);
201 tokio
::spawn(control_future
);
206 /// Register a new command with a callback.
207 pub fn register_command
<F
>(
211 ) -> Result
<(), Error
>
213 F
: Fn(Option
<&Value
>) -> Result
<Value
, Error
> + Send
+ Sync
+ '
static,
216 if self.commands
.contains_key(&command
) {
217 bail
!("command '{}' already exists!", command
);
220 self.commands
.insert(command
, Box
::new(handler
));