4 use futures
::stream
::Stream
;
6 use tokio
::net
::unix
::UnixListener
;
7 use tokio
::io
::AsyncRead
;
11 use std
::path
::PathBuf
;
12 use serde_json
::Value
;
14 use std
::os
::unix
::io
::AsRawFd
;
17 use proxmox
::tools
::try_block
;
19 /// Listens on a Unix Socket to handle simple command asynchronously
20 pub fn create_control_socket
<P
, F
>(path
: P
, f
: F
) -> Result
<impl Future
<Item
=(), Error
=()>, Error
>
21 where P
: Into
<PathBuf
>,
22 F
: Send
+ Sync
+'
static + Fn(Value
) -> Result
<Value
, Error
>,
24 let path
: PathBuf
= path
.into();
26 let socket
= UnixListener
::bind(&path
)?
;
29 let path2
= Arc
::new(path
);
30 let path3
= path2
.clone();
32 let control_future
= socket
.incoming()
35 // check permissions (same gid, or root user)
36 let opt
= socket
::sockopt
::PeerCredentials {}
;
37 match socket
::getsockopt(conn
.as_raw_fd(), opt
) {
39 let mygid
= unsafe { libc::getgid() }
;
40 if !(cred
.uid() == 0 || cred
.gid() == mygid
) {
41 bail
!("no permissions for {:?}", cred
);
44 Err(err
) => bail
!("no permissions - unable to read peer credential - {}", err
),
48 .map_err(move |err
| { eprintln!("failed to accept on control socket {:?}
: {}
", path2, err); })
49 .for_each(move |conn| {
52 let (rx, mut tx) = conn.split();
53 let path = path3.clone();
54 let path2 = path3.clone();
56 let abort_future = super::last_worker_future().map_err(|_| {});
59 tokio::io::lines(std::io::BufReader::new(rx))
60 .map_err(move |err| { eprintln!("control socket {:?} read error: {}", path
, err
); })
61 .and_then(move |cmd
| {
62 let res
= try_block
!({
63 let param
= match cmd
.parse
::<Value
>() {
65 Err(err
) => bail
!("unable to parse json value - {}", err
),
71 let resp
= match res
{
72 Ok(v
) => format
!("OK: {}\n", v
),
73 Err(err
) => format
!("ERROR: {}\n", err
),
77 .for_each(move |resp
| {
78 tx
.write_all(resp
.as_bytes())
79 .map_err(|err
| { eprintln!("control socket {:?} write response error
: {}
", path2, err); })
82 .then(move |_| { Ok(()) })
86 let abort_future = super::last_worker_future().map_err(|_| {});
87 let task = control_future.select(abort_future)
88 .then(move |_| { Ok(()) });
94 pub fn send_command<P>(
97 ) -> impl Future<Item=Value, Error=Error>
98 where P: Into<PathBuf>,
101 let path: PathBuf = path.into();
103 tokio::net::UnixStream::connect(path)
104 .map_err(move |err| format_err!("control socket connect failed
- {}
", err))
105 .and_then(move |conn| {
107 let (rx, tx) = conn.split();
109 let mut command_string = params.to_string();
110 command_string.push('\n');
112 tokio::io::write_all(tx, command_string)
113 .and_then(|(tx,_)| tokio::io::shutdown(tx))
114 .map_err(|err| format_err!("control socket write error
- {}
", err))
116 tokio::io::lines(std::io::BufReader::new(rx))
120 Ok((Some(data), _)) => {
121 if data.starts_with("OK
: ") {
122 match data[4..].parse::<Value>() {
124 Err(err) => bail!("unable to parse json response
- {}
", err),
126 } else if data.starts_with("ERROR
: ") {
127 bail!("{}
", &data[7..]);
129 bail!("unable to parse response
: {}
", data);
133 bail!("no response
");
135 Err((err, _)) => Err(Error::from(err)),