5 use tokio
::net
::unix
::UnixListener
;
7 use std
::path
::PathBuf
;
10 use std
::os
::unix
::io
::AsRawFd
;
13 /// Listens on a Unix Socket to handle simple command asynchronously
14 pub fn create_control_socket
<P
, F
>(path
: P
, f
: F
) -> Result
<impl Future
<Output
= ()>, Error
>
17 F
: Fn(Value
) -> Result
<Value
, Error
> + Send
+ Sync
+ '
static,
19 let path
: PathBuf
= path
.into();
21 let socket
= UnixListener
::bind(&path
)?
;
24 let path2
= Arc
::new(path
);
25 let path3
= path2
.clone();
27 let control_future
= socket
.incoming()
30 use futures
::future
::{err, ok}
;
32 // check permissions (same gid, or root user)
33 let opt
= socket
::sockopt
::PeerCredentials {}
;
34 match socket
::getsockopt(conn
.as_raw_fd(), opt
) {
36 let mygid
= unsafe { libc::getgid() }
;
37 if !(cred
.uid() == 0 || cred
.gid() == mygid
) {
38 return err(format_err
!("no permissions for {:?}", cred
));
42 return err(format_err
!(
43 "no permissions - unable to read peer credential - {}",
50 .map_err(move |err
| { eprintln!("failed to accept on control socket {:?}
: {}
", path2, err); })
51 .try_for_each(move |conn| {
52 let f = Arc::clone(&f);
54 let (rx, mut tx) = conn.split();
55 let path = path3.clone();
57 let abort_future = super::last_worker_future().map(|_| ());
59 use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
60 tokio::spawn(futures::future::select(
62 let mut rx = tokio::io::BufReader::new(rx);
63 let mut line = String::new();
66 match rx.read_line({ line.clear(); &mut line }).await {
70 eprintln!("control socket {:?} read error
: {}
", path, err);
75 let response = match line.parse::<Value>() {
76 Ok(param) => match f(param) {
77 Ok(res) => format!("OK
: {}
\n", res),
78 Err(err) => format!("ERROR
: {}
\n", err),
80 Err(err) => format!("ERROR
: {}
\n", err),
83 if let Err(err) = tx.write_all(response.as_bytes()).await {
84 eprintln!("control socket {:?} write response error
: {}
", path, err);
91 futures::future::ok(())
94 let abort_future = super::last_worker_future().map_err(|_| {});
95 let task = futures::future::select(
104 pub fn send_command<P>(
107 ) -> impl Future<Output = Result<Value, Error>>
108 where P: Into<PathBuf>,
111 let path: PathBuf = path.into();
113 tokio::net::UnixStream::connect(path)
114 .map_err(move |err| format_err!("control socket connect failed
- {}
", err))
115 .and_then(move |conn| {
117 let (rx, mut tx) = conn.split();
119 let mut command_string = params.to_string();
120 command_string.push('\n');
123 use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
125 tx.write_all(command_string.as_bytes()).await?;
126 tx.shutdown().await?;
127 let mut rx = tokio::io::BufReader::new(rx);
128 let mut data = String::new();
129 if rx.read_line(&mut data).await? == 0 {
130 bail!("no response
");
132 if data.starts_with("OK
: ") {
133 match data[4..].parse::<Value>() {
135 Err(err) => bail!("unable to parse json response
- {}
", err),
137 } else if data.starts_with("ERROR
: ") {
138 bail!("{}
", &data[7..]);
140 bail!("unable to parse response
: {}
", data);