]> git.proxmox.com Git - proxmox-backup.git/blob - src/server/command_socket.rs
update to nix 0.14, use code from proxmox:tools
[proxmox-backup.git] / src / server / command_socket.rs
1 use failure::*;
2
3 use futures::*;
4 use futures::stream::Stream;
5
6 use tokio::net::unix::UnixListener;
7 use tokio::io::AsyncRead;
8
9 use std::io::Write;
10
11 use std::path::PathBuf;
12 use serde_json::Value;
13 use std::sync::Arc;
14 use std::os::unix::io::AsRawFd;
15 use nix::sys::socket;
16
17 use proxmox::tools::try_block;
18
19 /// Listens on a Unix Socket to handle simple command asynchronously
20 pub fn create_control_socket<P, F>(path: P, f: F) -> Result<impl Future<Item=(), Error=()>, Error>
21 where P: Into<PathBuf>,
22 F: Send + Sync +'static + Fn(Value) -> Result<Value, Error>,
23 {
24 let path: PathBuf = path.into();
25
26 let socket = UnixListener::bind(&path)?;
27
28 let f = Arc::new(f);
29 let path2 = Arc::new(path);
30 let path3 = path2.clone();
31
32 let control_future = socket.incoming()
33 .map_err(Error::from)
34 .and_then(|conn| {
35 // check permissions (same gid, or root user)
36 let opt = socket::sockopt::PeerCredentials {};
37 match socket::getsockopt(conn.as_raw_fd(), opt) {
38 Ok(cred) => {
39 let mygid = unsafe { libc::getgid() };
40 if !(cred.uid() == 0 || cred.gid() == mygid) {
41 bail!("no permissions for {:?}", cred);
42 }
43 }
44 Err(err) => bail!("no permissions - unable to read peer credential - {}", err),
45 }
46 Ok(conn)
47 })
48 .map_err(move |err| { eprintln!("failed to accept on control socket {:?}: {}", path2, err); })
49 .for_each(move |conn| {
50 let f1 = f.clone();
51
52 let (rx, mut tx) = conn.split();
53 let path = path3.clone();
54 let path2 = path3.clone();
55
56 let abort_future = super::last_worker_future().map_err(|_| {});
57
58 tokio::spawn(
59 tokio::io::lines(std::io::BufReader::new(rx))
60 .map_err(move |err| { eprintln!("control socket {:?} read error: {}", path, err); })
61 .and_then(move |cmd| {
62 let res = try_block!({
63 let param = match cmd.parse::<Value>() {
64 Ok(p) => p,
65 Err(err) => bail!("unable to parse json value - {}", err),
66 };
67
68 f1(param)
69 });
70
71 let resp = match res {
72 Ok(v) => format!("OK: {}\n", v),
73 Err(err) => format!("ERROR: {}\n", err),
74 };
75 Ok(resp)
76 })
77 .for_each(move |resp| {
78 tx.write_all(resp.as_bytes())
79 .map_err(|err| { eprintln!("control socket {:?} write response error: {}", path2, err); })
80 })
81 .select(abort_future)
82 .then(move |_| { Ok(()) })
83 )
84 });
85
86 let abort_future = super::last_worker_future().map_err(|_| {});
87 let task = control_future.select(abort_future)
88 .then(move |_| { Ok(()) });
89
90 Ok(task)
91 }
92
93
94 pub fn send_command<P>(
95 path: P,
96 params: Value
97 ) -> impl Future<Item=Value, Error=Error>
98 where P: Into<PathBuf>,
99
100 {
101 let path: PathBuf = path.into();
102
103 tokio::net::UnixStream::connect(path)
104 .map_err(move |err| format_err!("control socket connect failed - {}", err))
105 .and_then(move |conn| {
106
107 let (rx, tx) = conn.split();
108
109 let mut command_string = params.to_string();
110 command_string.push('\n');
111
112 tokio::io::write_all(tx, command_string)
113 .and_then(|(tx,_)| tokio::io::shutdown(tx))
114 .map_err(|err| format_err!("control socket write error - {}", err))
115 .and_then(move |_| {
116 tokio::io::lines(std::io::BufReader::new(rx))
117 .into_future()
118 .then(|test| {
119 match test {
120 Ok((Some(data), _)) => {
121 if data.starts_with("OK: ") {
122 match data[4..].parse::<Value>() {
123 Ok(v) => Ok(v),
124 Err(err) => bail!("unable to parse json response - {}", err),
125 }
126 } else if data.starts_with("ERROR: ") {
127 bail!("{}", &data[7..]);
128 } else {
129 bail!("unable to parse response: {}", data);
130 }
131 }
132 Ok((None, _)) => {
133 bail!("no response");
134 }
135 Err((err, _)) => Err(Error::from(err)),
136 }
137 })
138 })
139 })
140 }