]> git.proxmox.com Git - proxmox-backup.git/blob - src/server/command_socket.rs
src/server/command_socket.rs: switch to async
[proxmox-backup.git] / src / server / command_socket.rs
1 use failure::*;
2
3 use futures::*;
4
5 use tokio::net::unix::UnixListener;
6
7 use std::path::PathBuf;
8 use serde_json::Value;
9 use std::sync::Arc;
10 use std::os::unix::io::AsRawFd;
11 use nix::sys::socket;
12
13 /// Listens on a Unix Socket to handle simple command asynchronously
14 pub fn create_control_socket<P, F>(path: P, f: F) -> Result<impl Future<Output = ()>, Error>
15 where
16 P: Into<PathBuf>,
17 F: Fn(Value) -> Result<Value, Error> + Send + Sync + 'static,
18 {
19 let path: PathBuf = path.into();
20
21 let socket = UnixListener::bind(&path)?;
22
23 let f = Arc::new(f);
24 let path2 = Arc::new(path);
25 let path3 = path2.clone();
26
27 let control_future = socket.incoming()
28 .map_err(Error::from)
29 .and_then(|conn| {
30 use futures::future::{err, ok};
31
32 // check permissions (same gid, or root user)
33 let opt = socket::sockopt::PeerCredentials {};
34 match socket::getsockopt(conn.as_raw_fd(), opt) {
35 Ok(cred) => {
36 let mygid = unsafe { libc::getgid() };
37 if !(cred.uid() == 0 || cred.gid() == mygid) {
38 return err(format_err!("no permissions for {:?}", cred));
39 }
40 }
41 Err(e) => {
42 return err(format_err!(
43 "no permissions - unable to read peer credential - {}",
44 e,
45 ));
46 }
47 }
48 ok(conn)
49 })
50 .map_err(move |err| { eprintln!("failed to accept on control socket {:?}: {}", path2, err); })
51 .try_for_each(move |conn| {
52 let f = Arc::clone(&f);
53
54 let (rx, mut tx) = conn.split();
55 let path = path3.clone();
56
57 let abort_future = super::last_worker_future().map(|_| ());
58
59 use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
60 tokio::spawn(futures::future::select(
61 async move {
62 let mut rx = tokio::io::BufReader::new(rx);
63 let mut line = String::new();
64 loop {
65 line.clear();
66 match rx.read_line({ line.clear(); &mut line }).await {
67 Ok(0) => break,
68 Ok(_) => (),
69 Err(err) => {
70 eprintln!("control socket {:?} read error: {}", path, err);
71 return;
72 }
73 }
74
75 let response = match line.parse::<Value>() {
76 Ok(param) => match f(param) {
77 Ok(res) => format!("OK: {}\n", res),
78 Err(err) => format!("ERROR: {}\n", err),
79 }
80 Err(err) => format!("ERROR: {}\n", err),
81 };
82
83 if let Err(err) = tx.write_all(response.as_bytes()).await {
84 eprintln!("control socket {:?} write response error: {}", path, err);
85 return;
86 }
87 }
88 }.boxed(),
89 abort_future,
90 ).map(|_| ()));
91 futures::future::ok(())
92 });
93
94 let abort_future = super::last_worker_future().map_err(|_| {});
95 let task = futures::future::select(
96 control_future,
97 abort_future,
98 ).map(|_| ());
99
100 Ok(task)
101 }
102
103
104 pub fn send_command<P>(
105 path: P,
106 params: Value
107 ) -> impl Future<Output = Result<Value, Error>>
108 where P: Into<PathBuf>,
109
110 {
111 let path: PathBuf = path.into();
112
113 tokio::net::UnixStream::connect(path)
114 .map_err(move |err| format_err!("control socket connect failed - {}", err))
115 .and_then(move |conn| {
116
117 let (rx, mut tx) = conn.split();
118
119 let mut command_string = params.to_string();
120 command_string.push('\n');
121
122 async move {
123 use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
124
125 tx.write_all(command_string.as_bytes()).await?;
126 tx.shutdown().await?;
127 let mut rx = tokio::io::BufReader::new(rx);
128 let mut data = String::new();
129 if rx.read_line(&mut data).await? == 0 {
130 bail!("no response");
131 }
132 if data.starts_with("OK: ") {
133 match data[4..].parse::<Value>() {
134 Ok(v) => Ok(v),
135 Err(err) => bail!("unable to parse json response - {}", err),
136 }
137 } else if data.starts_with("ERROR: ") {
138 bail!("{}", &data[7..]);
139 } else {
140 bail!("unable to parse response: {}", data);
141 }
142 }
143 })
144 }