]>
Commit | Line | Data |
---|---|---|
f7d4e4b5 | 1 | use anyhow::{bail, format_err, Error}; |
78a39e05 | 2 | |
f3df613c TL |
3 | use std::collections::HashMap; |
4 | use std::os::unix::io::AsRawFd; | |
45b8a032 | 5 | use std::path::{PathBuf, Path}; |
f3df613c | 6 | use std::sync::Arc; |
78a39e05 | 7 | |
f3df613c | 8 | use futures::*; |
db0cb9ce | 9 | use tokio::net::UnixListener; |
45b8a032 | 10 | use serde::Serialize; |
78a39e05 | 11 | use serde_json::Value; |
59961b89 | 12 | use nix::sys::socket; |
78a39e05 DM |
13 | |
14 | /// Listens on a Unix Socket to handle simple command asynchronously | |
b469011f | 15 | fn create_control_socket<P, F>(path: P, func: F) -> Result<impl Future<Output = ()>, Error> |
160fc814 WB |
16 | where |
17 | P: Into<PathBuf>, | |
18 | F: Fn(Value) -> Result<Value, Error> + Send + Sync + 'static, | |
78a39e05 DM |
19 | { |
20 | let path: PathBuf = path.into(); | |
e2017536 | 21 | |
197de83f DM |
22 | let backup_user = crate::backup::backup_user()?; |
23 | let backup_gid = backup_user.gid.as_raw(); | |
24 | ||
0bfcea6a | 25 | let socket = UnixListener::bind(&path)?; |
78a39e05 | 26 | |
db0cb9ce | 27 | let func = Arc::new(func); |
78a39e05 | 28 | |
db0cb9ce WB |
29 | let control_future = async move { |
30 | loop { | |
197de83f DM |
31 | let (conn, _addr) = match socket.accept().await { |
32 | Ok(data) => data, | |
33 | Err(err) => { | |
34 | eprintln!("failed to accept on control socket {:?}: {}", path, err); | |
35 | continue; | |
36 | } | |
37 | }; | |
38 | ||
59961b89 | 39 | let opt = socket::sockopt::PeerCredentials {}; |
197de83f DM |
40 | let cred = match socket::getsockopt(conn.as_raw_fd(), opt) { |
41 | Ok(cred) => cred, | |
42 | Err(err) => { | |
43 | eprintln!("no permissions - unable to read peer credential - {}", err); | |
44 | continue; | |
59961b89 | 45 | } |
197de83f DM |
46 | }; |
47 | ||
48 | // check permissions (same gid, root user, or backup group) | |
49 | let mygid = unsafe { libc::getgid() }; | |
50 | if !(cred.uid() == 0 || cred.gid() == mygid || cred.gid() == backup_gid) { | |
51 | eprintln!("no permissions for {:?}", cred); | |
52 | continue; | |
59961b89 | 53 | } |
78a39e05 | 54 | |
db0cb9ce | 55 | let (rx, mut tx) = tokio::io::split(conn); |
78a39e05 | 56 | |
160fc814 WB |
57 | let abort_future = super::last_worker_future().map(|_| ()); |
58 | ||
59 | use tokio::io::{AsyncBufReadExt, AsyncWriteExt}; | |
db0cb9ce WB |
60 | let func = Arc::clone(&func); |
61 | let path = path.clone(); | |
160fc814 WB |
62 | tokio::spawn(futures::future::select( |
63 | async move { | |
64 | let mut rx = tokio::io::BufReader::new(rx); | |
65 | let mut line = String::new(); | |
66 | loop { | |
67 | line.clear(); | |
68 | match rx.read_line({ line.clear(); &mut line }).await { | |
69 | Ok(0) => break, | |
70 | Ok(_) => (), | |
71 | Err(err) => { | |
72 | eprintln!("control socket {:?} read error: {}", path, err); | |
73 | return; | |
74 | } | |
75 | } | |
78a39e05 | 76 | |
160fc814 | 77 | let response = match line.parse::<Value>() { |
db0cb9ce | 78 | Ok(param) => match func(param) { |
160fc814 WB |
79 | Ok(res) => format!("OK: {}\n", res), |
80 | Err(err) => format!("ERROR: {}\n", err), | |
81 | } | |
cfb2d3c1 DM |
82 | Err(err) => format!("ERROR: {}\n", err), |
83 | }; | |
160fc814 WB |
84 | |
85 | if let Err(err) = tx.write_all(response.as_bytes()).await { | |
86 | eprintln!("control socket {:?} write response error: {}", path, err); | |
87 | return; | |
88 | } | |
89 | } | |
90 | }.boxed(), | |
91 | abort_future, | |
92 | ).map(|_| ())); | |
db0cb9ce WB |
93 | } |
94 | }.boxed(); | |
78a39e05 DM |
95 | |
96 | let abort_future = super::last_worker_future().map_err(|_| {}); | |
160fc814 WB |
97 | let task = futures::future::select( |
98 | control_future, | |
99 | abort_future, | |
db0cb9ce | 100 | ).map(|_: futures::future::Either<(Result<(), Error>, _), _>| ()); |
78a39e05 DM |
101 | |
102 | Ok(task) | |
103 | } | |
321070b4 DM |
104 | |
105 | ||
45b8a032 WB |
106 | pub async fn send_command<P, T>(path: P, params: &T) -> Result<Value, Error> |
107 | where | |
108 | P: AsRef<Path>, | |
109 | T: ?Sized + Serialize, | |
321070b4 | 110 | { |
45b8a032 WB |
111 | let mut command_string = serde_json::to_string(params)?; |
112 | command_string.push('\n'); | |
113 | send_raw_command(path.as_ref(), &command_string).await | |
114 | } | |
321070b4 | 115 | |
45b8a032 WB |
116 | pub async fn send_raw_command<P>(path: P, command_string: &str) -> Result<Value, Error> |
117 | where | |
118 | P: AsRef<Path>, | |
119 | { | |
120 | use tokio::io::{AsyncBufReadExt, AsyncWriteExt}; | |
160fc814 | 121 | |
45b8a032 WB |
122 | let mut conn = tokio::net::UnixStream::connect(path) |
123 | .map_err(move |err| format_err!("control socket connect failed - {}", err)) | |
124 | .await?; | |
125 | ||
126 | conn.write_all(command_string.as_bytes()).await?; | |
127 | if !command_string.as_bytes().ends_with(b"\n") { | |
128 | conn.write_all(b"\n").await?; | |
129 | } | |
130 | ||
131 | AsyncWriteExt::shutdown(&mut conn).await?; | |
132 | let mut rx = tokio::io::BufReader::new(conn); | |
133 | let mut data = String::new(); | |
134 | if rx.read_line(&mut data).await? == 0 { | |
135 | bail!("no response"); | |
136 | } | |
137 | if let Some(res) = data.strip_prefix("OK: ") { | |
138 | match res.parse::<Value>() { | |
139 | Ok(v) => Ok(v), | |
140 | Err(err) => bail!("unable to parse json response - {}", err), | |
141 | } | |
142 | } else if let Some(err) = data.strip_prefix("ERROR: ") { | |
143 | bail!("{}", err); | |
144 | } else { | |
145 | bail!("unable to parse response: {}", data); | |
146 | } | |
321070b4 | 147 | } |
f3df613c TL |
148 | |
149 | /// A callback for a specific commando socket. | |
150 | pub type CommandoSocketFn = Box<(dyn Fn(Option<&Value>) -> Result<Value, Error> + Send + Sync + 'static)>; | |
151 | ||
152 | /// Tooling to get a single control command socket where one can register multiple commands | |
153 | /// dynamically. | |
154 | /// You need to call `spawn()` to make the socket active. | |
155 | pub struct CommandoSocket { | |
156 | socket: PathBuf, | |
157 | commands: HashMap<String, CommandoSocketFn>, | |
158 | } | |
159 | ||
160 | impl CommandoSocket { | |
161 | pub fn new<P>(path: P) -> Self | |
162 | where P: Into<PathBuf>, | |
163 | { | |
164 | CommandoSocket { | |
165 | socket: path.into(), | |
166 | commands: HashMap::new(), | |
167 | } | |
168 | } | |
169 | ||
170 | /// Spawn the socket and consume self, meaning you cannot register commands anymore after | |
171 | /// calling this. | |
172 | pub fn spawn(self) -> Result<(), Error> { | |
173 | let control_future = create_control_socket(self.socket.to_owned(), move |param| { | |
174 | let param = param | |
175 | .as_object() | |
176 | .ok_or_else(|| format_err!("unable to parse parameters (expected json object)"))?; | |
177 | ||
178 | let command = match param.get("command") { | |
179 | Some(Value::String(command)) => command.as_str(), | |
180 | None => bail!("no command"), | |
181 | _ => bail!("unable to parse command"), | |
182 | }; | |
183 | ||
184 | if !self.commands.contains_key(command) { | |
185 | bail!("got unknown command '{}'", command); | |
186 | } | |
187 | ||
188 | match self.commands.get(command) { | |
189 | None => bail!("got unknown command '{}'", command), | |
190 | Some(handler) => { | |
191 | let args = param.get("args"); //.unwrap_or(&Value::Null); | |
192 | (handler)(args) | |
193 | }, | |
194 | } | |
195 | })?; | |
196 | ||
197 | tokio::spawn(control_future); | |
198 | ||
199 | Ok(()) | |
200 | } | |
201 | ||
202 | /// Register a new command with a callback. | |
203 | pub fn register_command<F>( | |
204 | &mut self, | |
205 | command: String, | |
206 | handler: F, | |
207 | ) -> Result<(), Error> | |
208 | where | |
209 | F: Fn(Option<&Value>) -> Result<Value, Error> + Send + Sync + 'static, | |
210 | { | |
211 | ||
212 | if self.commands.contains_key(&command) { | |
213 | bail!("command '{}' already exists!", command); | |
214 | } | |
215 | ||
216 | self.commands.insert(command, Box::new(handler)); | |
217 | ||
218 | Ok(()) | |
219 | } | |
220 | } |