]> git.proxmox.com Git - proxmox-backup.git/blob - src/server/command_socket.rs
src/server/command_socket.rs: check control socket permissions
[proxmox-backup.git] / src / server / command_socket.rs
1 use failure::*;
2
3 use futures::*;
4 use futures::stream::Stream;
5
6 use tokio::net::unix::UnixListener;
7 use tokio::io::AsyncRead;
8
9 use std::io::Write;
10
11 use std::path::PathBuf;
12 use serde_json::Value;
13 use std::sync::Arc;
14 use std::os::unix::io::AsRawFd;
15 use nix::sys::socket;
16
17 /// Listens on a Unix Socket to handle simple command asynchronously
18 pub fn create_control_socket<P, F>(path: P, f: F) -> Result<impl Future<Item=(), Error=()>, Error>
19 where P: Into<PathBuf>,
20 F: Send + Sync +'static + Fn(Value) -> Result<Value, Error>,
21 {
22 let path: PathBuf = path.into();
23
24 let socket = UnixListener::bind(&path)?;
25
26 let f = Arc::new(f);
27 let path2 = Arc::new(path);
28 let path3 = path2.clone();
29
30 let control_future = socket.incoming()
31 .map_err(Error::from)
32 .and_then(|conn| {
33 // check permissions (same gid, or root user)
34 let opt = socket::sockopt::PeerCredentials {};
35 match socket::getsockopt(conn.as_raw_fd(), opt) {
36 Ok(cred) => {
37 let mygid = unsafe { libc::getgid() };
38 if !(cred.uid() == 0 || cred.gid() == mygid) {
39 bail!("no permissions for {:?}", cred);
40 }
41 }
42 Err(err) => bail!("no permissions - unable to read peer credential - {}", err),
43 }
44 Ok(conn)
45 })
46 .map_err(move |err| { eprintln!("failed to accept on control socket {:?}: {}", path2, err); })
47 .for_each(move |conn| {
48 let f1 = f.clone();
49
50 let (rx, mut tx) = conn.split();
51 let path = path3.clone();
52 let path2 = path3.clone();
53
54 let abort_future = super::last_worker_future().map_err(|_| {});
55
56 tokio::spawn(
57 tokio::io::lines(std::io::BufReader::new(rx))
58 .map_err(move |err| { eprintln!("control socket {:?} read error: {}", path, err); })
59 .and_then(move |cmd| {
60 let res = try_block!({
61 let param = match cmd.parse::<Value>() {
62 Ok(p) => p,
63 Err(err) => bail!("unable to parse json value - {}", err),
64 };
65
66 f1(param)
67 });
68
69 let resp = match res {
70 Ok(v) => format!("OK: {}\n", v),
71 Err(err) => format!("ERROR: {}\n", err),
72 };
73 Ok(resp)
74 })
75 .for_each(move |resp| {
76 tx.write_all(resp.as_bytes())
77 .map_err(|err| { eprintln!("control socket {:?} write response error: {}", path2, err); })
78 })
79 .select(abort_future)
80 .then(move |_| { Ok(()) })
81 )
82 });
83
84 let abort_future = super::last_worker_future().map_err(|_| {});
85 let task = control_future.select(abort_future)
86 .then(move |_| { Ok(()) });
87
88 Ok(task)
89 }
90
91
92 pub fn send_command<P>(
93 path: P,
94 params: Value
95 ) -> impl Future<Item=Value, Error=Error>
96 where P: Into<PathBuf>,
97
98 {
99 let path: PathBuf = path.into();
100
101 tokio::net::UnixStream::connect(path)
102 .map_err(move |err| format_err!("control socket connect failed - {}", err))
103 .and_then(move |conn| {
104
105 let (rx, tx) = conn.split();
106
107 let mut command_string = params.to_string();
108 command_string.push('\n');
109
110 tokio::io::write_all(tx, command_string)
111 .and_then(|(tx,_)| tokio::io::shutdown(tx))
112 .map_err(|err| format_err!("control socket write error - {}", err))
113 .and_then(move |_| {
114 tokio::io::lines(std::io::BufReader::new(rx))
115 .into_future()
116 .then(|test| {
117 match test {
118 Ok((Some(data), _)) => {
119 if data.starts_with("OK: ") {
120 match data[4..].parse::<Value>() {
121 Ok(v) => Ok(v),
122 Err(err) => bail!("unable to parse json response - {}", err),
123 }
124 } else if data.starts_with("ERROR: ") {
125 bail!("{}", &data[7..]);
126 } else {
127 bail!("unable to parse response: {}", data);
128 }
129 }
130 Ok((None, _)) => {
131 bail!("no response");
132 }
133 Err((err, _)) => Err(Error::from(err)),
134 }
135 })
136 })
137 })
138 }