4 use futures
::stream
::Stream
;
6 use tokio
::net
::unix
::UnixListener
;
7 use tokio
::io
::AsyncRead
;
11 use std
::path
::PathBuf
;
12 use serde_json
::Value
;
14 use std
::os
::unix
::io
::AsRawFd
;
17 /// Listens on a Unix Socket to handle simple command asynchronously
18 pub fn create_control_socket
<P
, F
>(path
: P
, f
: F
) -> Result
<impl Future
<Item
=(), Error
=()>, Error
>
19 where P
: Into
<PathBuf
>,
20 F
: Send
+ Sync
+'
static + Fn(Value
) -> Result
<Value
, Error
>,
22 let path
: PathBuf
= path
.into();
24 let socket
= UnixListener
::bind(&path
)?
;
27 let path2
= Arc
::new(path
);
28 let path3
= path2
.clone();
30 let control_future
= socket
.incoming()
33 // check permissions (same gid, or root user)
34 let opt
= socket
::sockopt
::PeerCredentials {}
;
35 match socket
::getsockopt(conn
.as_raw_fd(), opt
) {
37 let mygid
= unsafe { libc::getgid() }
;
38 if !(cred
.uid() == 0 || cred
.gid() == mygid
) {
39 bail
!("no permissions for {:?}", cred
);
42 Err(err
) => bail
!("no permissions - unable to read peer credential - {}", err
),
46 .map_err(move |err
| { eprintln!("failed to accept on control socket {:?}
: {}
", path2, err); })
47 .for_each(move |conn| {
50 let (rx, mut tx) = conn.split();
51 let path = path3.clone();
52 let path2 = path3.clone();
54 let abort_future = super::last_worker_future().map_err(|_| {});
57 tokio::io::lines(std::io::BufReader::new(rx))
58 .map_err(move |err| { eprintln!("control socket {:?} read error: {}", path
, err
); })
59 .and_then(move |cmd
| {
60 let res
= try_block
!({
61 let param
= match cmd
.parse
::<Value
>() {
63 Err(err
) => bail
!("unable to parse json value - {}", err
),
69 let resp
= match res
{
70 Ok(v
) => format
!("OK: {}\n", v
),
71 Err(err
) => format
!("ERROR: {}\n", err
),
75 .for_each(move |resp
| {
76 tx
.write_all(resp
.as_bytes())
77 .map_err(|err
| { eprintln!("control socket {:?} write response error
: {}
", path2, err); })
80 .then(move |_| { Ok(()) })
84 let abort_future = super::last_worker_future().map_err(|_| {});
85 let task = control_future.select(abort_future)
86 .then(move |_| { Ok(()) });
92 pub fn send_command<P>(
95 ) -> impl Future<Item=Value, Error=Error>
96 where P: Into<PathBuf>,
99 let path: PathBuf = path.into();
101 tokio::net::UnixStream::connect(path)
102 .map_err(move |err| format_err!("control socket connect failed
- {}
", err))
103 .and_then(move |conn| {
105 let (rx, tx) = conn.split();
107 let mut command_string = params.to_string();
108 command_string.push('\n');
110 tokio::io::write_all(tx, command_string)
111 .and_then(|(tx,_)| tokio::io::shutdown(tx))
112 .map_err(|err| format_err!("control socket write error
- {}
", err))
114 tokio::io::lines(std::io::BufReader::new(rx))
118 Ok((Some(data), _)) => {
119 if data.starts_with("OK
: ") {
120 match data[4..].parse::<Value>() {
122 Err(err) => bail!("unable to parse json response
- {}
", err),
124 } else if data.starts_with("ERROR
: ") {
125 bail!("{}
", &data[7..]);
127 bail!("unable to parse response
: {}
", data);
131 bail!("no response
");
133 Err((err, _)) => Err(Error::from(err)),