]>
Commit | Line | Data |
---|---|---|
78a39e05 DM |
1 | use failure::*; |
2 | ||
3 | use futures::*; | |
78a39e05 | 4 | |
db0cb9ce | 5 | use tokio::net::UnixListener; |
78a39e05 DM |
6 | |
7 | use std::path::PathBuf; | |
8 | use serde_json::Value; | |
9 | use std::sync::Arc; | |
59961b89 DM |
10 | use std::os::unix::io::AsRawFd; |
11 | use nix::sys::socket; | |
78a39e05 DM |
12 | |
13 | /// Listens on a Unix Socket to handle simple command asynchronously | |
db0cb9ce | 14 | pub fn create_control_socket<P, F>(path: P, func: F) -> Result<impl Future<Output = ()>, Error> |
160fc814 WB |
15 | where |
16 | P: Into<PathBuf>, | |
17 | F: Fn(Value) -> Result<Value, Error> + Send + Sync + 'static, | |
78a39e05 DM |
18 | { |
19 | let path: PathBuf = path.into(); | |
e2017536 | 20 | |
db0cb9ce | 21 | let mut socket = UnixListener::bind(&path)?; |
78a39e05 | 22 | |
db0cb9ce | 23 | let func = Arc::new(func); |
78a39e05 | 24 | |
db0cb9ce WB |
25 | let control_future = async move { |
26 | loop { | |
27 | let (conn, _addr) = socket | |
28 | .accept() | |
29 | .await | |
30 | .map_err(|err| { | |
31 | format_err!("failed to accept on control socket {:?}: {}", path, err) | |
32 | })?; | |
160fc814 | 33 | |
59961b89 DM |
34 | // check permissions (same gid, or root user) |
35 | let opt = socket::sockopt::PeerCredentials {}; | |
36 | match socket::getsockopt(conn.as_raw_fd(), opt) { | |
37 | Ok(cred) => { | |
38 | let mygid = unsafe { libc::getgid() }; | |
39 | if !(cred.uid() == 0 || cred.gid() == mygid) { | |
db0cb9ce | 40 | bail!("no permissions for {:?}", cred); |
59961b89 DM |
41 | } |
42 | } | |
db0cb9ce | 43 | Err(e) => bail!("no permissions - unable to read peer credential - {}", e), |
59961b89 | 44 | } |
78a39e05 | 45 | |
db0cb9ce | 46 | let (rx, mut tx) = tokio::io::split(conn); |
78a39e05 | 47 | |
160fc814 WB |
48 | let abort_future = super::last_worker_future().map(|_| ()); |
49 | ||
50 | use tokio::io::{AsyncBufReadExt, AsyncWriteExt}; | |
db0cb9ce WB |
51 | let func = Arc::clone(&func); |
52 | let path = path.clone(); | |
160fc814 WB |
53 | tokio::spawn(futures::future::select( |
54 | async move { | |
55 | let mut rx = tokio::io::BufReader::new(rx); | |
56 | let mut line = String::new(); | |
57 | loop { | |
58 | line.clear(); | |
59 | match rx.read_line({ line.clear(); &mut line }).await { | |
60 | Ok(0) => break, | |
61 | Ok(_) => (), | |
62 | Err(err) => { | |
63 | eprintln!("control socket {:?} read error: {}", path, err); | |
64 | return; | |
65 | } | |
66 | } | |
78a39e05 | 67 | |
160fc814 | 68 | let response = match line.parse::<Value>() { |
db0cb9ce | 69 | Ok(param) => match func(param) { |
160fc814 WB |
70 | Ok(res) => format!("OK: {}\n", res), |
71 | Err(err) => format!("ERROR: {}\n", err), | |
72 | } | |
cfb2d3c1 DM |
73 | Err(err) => format!("ERROR: {}\n", err), |
74 | }; | |
160fc814 WB |
75 | |
76 | if let Err(err) = tx.write_all(response.as_bytes()).await { | |
77 | eprintln!("control socket {:?} write response error: {}", path, err); | |
78 | return; | |
79 | } | |
80 | } | |
81 | }.boxed(), | |
82 | abort_future, | |
83 | ).map(|_| ())); | |
db0cb9ce WB |
84 | } |
85 | }.boxed(); | |
78a39e05 DM |
86 | |
87 | let abort_future = super::last_worker_future().map_err(|_| {}); | |
160fc814 WB |
88 | let task = futures::future::select( |
89 | control_future, | |
90 | abort_future, | |
db0cb9ce | 91 | ).map(|_: futures::future::Either<(Result<(), Error>, _), _>| ()); |
78a39e05 DM |
92 | |
93 | Ok(task) | |
94 | } | |
321070b4 DM |
95 | |
96 | ||
97 | pub fn send_command<P>( | |
98 | path: P, | |
99 | params: Value | |
160fc814 | 100 | ) -> impl Future<Output = Result<Value, Error>> |
321070b4 DM |
101 | where P: Into<PathBuf>, |
102 | ||
103 | { | |
104 | let path: PathBuf = path.into(); | |
105 | ||
106 | tokio::net::UnixStream::connect(path) | |
107 | .map_err(move |err| format_err!("control socket connect failed - {}", err)) | |
db0cb9ce | 108 | .and_then(move |mut conn| { |
321070b4 DM |
109 | |
110 | let mut command_string = params.to_string(); | |
111 | command_string.push('\n'); | |
112 | ||
160fc814 WB |
113 | async move { |
114 | use tokio::io::{AsyncBufReadExt, AsyncWriteExt}; | |
115 | ||
db0cb9ce WB |
116 | conn.write_all(command_string.as_bytes()).await?; |
117 | AsyncWriteExt::shutdown(&mut conn).await?; | |
118 | let mut rx = tokio::io::BufReader::new(conn); | |
160fc814 WB |
119 | let mut data = String::new(); |
120 | if rx.read_line(&mut data).await? == 0 { | |
121 | bail!("no response"); | |
122 | } | |
123 | if data.starts_with("OK: ") { | |
124 | match data[4..].parse::<Value>() { | |
125 | Ok(v) => Ok(v), | |
126 | Err(err) => bail!("unable to parse json response - {}", err), | |
127 | } | |
128 | } else if data.starts_with("ERROR: ") { | |
129 | bail!("{}", &data[7..]); | |
130 | } else { | |
131 | bail!("unable to parse response: {}", data); | |
132 | } | |
133 | } | |
321070b4 DM |
134 | }) |
135 | } |