]> git.proxmox.com Git - proxmox-backup.git/blame - src/server/command_socket.rs
src/server/upid.rs: moved code into separate file
[proxmox-backup.git] / src / server / command_socket.rs
CommitLineData
78a39e05
DM
1use failure::*;
2
3use futures::*;
4use futures::stream::Stream;
5
6use tokio::net::unix::UnixListener;
7use tokio::io::AsyncRead;
8
9use std::io::Write;
10
11use std::path::PathBuf;
12use serde_json::Value;
13use std::sync::Arc;
14
15/// Listens on a Unix Socket to handle simple command asynchronously
16pub fn create_control_socket<P, F>(path: P, f: F) -> Result<impl Future<Item=(), Error=()>, Error>
17 where P: Into<PathBuf>,
18 F: Send + Sync +'static + Fn(Value) -> Result<Value, Error>,
19{
20 let path: PathBuf = path.into();
21
22 let socket = UnixListener::bind(&path)?;
23
24 let f = Arc::new(f);
25 let path = Arc::new(path);
26 let path2 = path.clone();
27 let path3 = path.clone();
28
29 let control_future = socket.incoming()
30 .map_err(move |err| { eprintln!("failed to accept on control socket {:?}: {}", path2, err); })
31 .for_each(move |conn| {
32 let f1 = f.clone();
33
34 let (rx, mut tx) = conn.split();
35 let path = path3.clone();
36 let path2 = path3.clone();
37
38 tokio::io::lines(std::io::BufReader::new(rx))
39 .map_err(move |err| { eprintln!("control socket {:?} read error: {}", path, err); })
40 .and_then(move |cmd| {
41 let res = try_block!({
42 let param = match cmd.parse::<Value>() {
43 Ok(p) => p,
44 Err(err) => bail!("ERRER {}", err),
45 };
46
47 f1(param)
48 });
49
50 let resp = match res {
51 Ok(v) => format!("OK: {}\n", v),
52 Err(err) => format!("ERROR: {}\n", err),
53 };
54 Ok(resp)
55 })
56 .for_each(move |resp| {
57 tx.write_all(resp.as_bytes())
58 .map_err(|err| { eprintln!("control socket {:?} write response error: {}", path2, err); })
59 })
60
61 });
62
63 let abort_future = super::last_worker_future().map_err(|_| {});
64 let task = control_future.select(abort_future).map(|_| {}).map_err(|_| {});
65
66 Ok(task)
67}