]>
Commit | Line | Data |
---|---|---|
78a39e05 DM |
1 | use failure::*; |
2 | ||
3 | use futures::*; | |
4 | use futures::stream::Stream; | |
5 | ||
6 | use tokio::net::unix::UnixListener; | |
7 | use tokio::io::AsyncRead; | |
8 | ||
9 | use std::io::Write; | |
10 | ||
11 | use std::path::PathBuf; | |
12 | use serde_json::Value; | |
13 | use std::sync::Arc; | |
14 | ||
15 | /// Listens on a Unix Socket to handle simple command asynchronously | |
16 | pub fn create_control_socket<P, F>(path: P, f: F) -> Result<impl Future<Item=(), Error=()>, Error> | |
17 | where P: Into<PathBuf>, | |
18 | F: Send + Sync +'static + Fn(Value) -> Result<Value, Error>, | |
19 | { | |
20 | let path: PathBuf = path.into(); | |
21 | ||
22 | let socket = UnixListener::bind(&path)?; | |
23 | ||
24 | let f = Arc::new(f); | |
25 | let path = Arc::new(path); | |
26 | let path2 = path.clone(); | |
27 | let path3 = path.clone(); | |
28 | ||
29 | let control_future = socket.incoming() | |
30 | .map_err(move |err| { eprintln!("failed to accept on control socket {:?}: {}", path2, err); }) | |
31 | .for_each(move |conn| { | |
32 | let f1 = f.clone(); | |
33 | ||
34 | let (rx, mut tx) = conn.split(); | |
35 | let path = path3.clone(); | |
36 | let path2 = path3.clone(); | |
37 | ||
38 | tokio::io::lines(std::io::BufReader::new(rx)) | |
39 | .map_err(move |err| { eprintln!("control socket {:?} read error: {}", path, err); }) | |
40 | .and_then(move |cmd| { | |
41 | let res = try_block!({ | |
42 | let param = match cmd.parse::<Value>() { | |
43 | Ok(p) => p, | |
44 | Err(err) => bail!("ERRER {}", err), | |
45 | }; | |
46 | ||
47 | f1(param) | |
48 | }); | |
49 | ||
50 | let resp = match res { | |
51 | Ok(v) => format!("OK: {}\n", v), | |
52 | Err(err) => format!("ERROR: {}\n", err), | |
53 | }; | |
54 | Ok(resp) | |
55 | }) | |
56 | .for_each(move |resp| { | |
57 | tx.write_all(resp.as_bytes()) | |
58 | .map_err(|err| { eprintln!("control socket {:?} write response error: {}", path2, err); }) | |
59 | }) | |
60 | ||
61 | }); | |
62 | ||
63 | let abort_future = super::last_worker_future().map_err(|_| {}); | |
64 | let task = control_future.select(abort_future).map(|_| {}).map_err(|_| {}); | |
65 | ||
66 | Ok(task) | |
67 | } |