F: Send + Sync +'static + Fn(Value) -> Result<Value, Error>,
{
let path: PathBuf = path.into();
- let path1: PathBuf = path.clone();
let socket = UnixListener::bind(&path)?;
let path = path3.clone();
let path2 = path3.clone();
- tokio::io::lines(std::io::BufReader::new(rx))
- .map_err(move |err| { eprintln!("control socket {:?} read error: {}", path, err); })
- .and_then(move |cmd| {
- let res = try_block!({
- let param = match cmd.parse::<Value>() {
- Ok(p) => p,
- Err(err) => bail!("unable to parse json value - {}", err),
- };
+ let abort_future = super::last_worker_future().map_err(|_| {});
- f1(param)
- });
+ tokio::spawn(
+ tokio::io::lines(std::io::BufReader::new(rx))
+ .map_err(move |err| { eprintln!("control socket {:?} read error: {}", path, err); })
+ .and_then(move |cmd| {
+ let res = try_block!({
+ let param = match cmd.parse::<Value>() {
+ Ok(p) => p,
+ Err(err) => bail!("unable to parse json value - {}", err),
+ };
- let resp = match res {
- Ok(v) => format!("OK: {}\n", v),
- Err(err) => format!("ERROR: {}\n", err),
- };
- Ok(resp)
- })
- .for_each(move |resp| {
- tx.write_all(resp.as_bytes())
- .map_err(|err| { eprintln!("control socket {:?} write response error: {}", path2, err); })
- })
+ f1(param)
+ });
+ let resp = match res {
+ Ok(v) => format!("OK: {}\n", v),
+ Err(err) => format!("ERROR: {}\n", err),
+ };
+ Ok(resp)
+ })
+ .for_each(move |resp| {
+ tx.write_all(resp.as_bytes())
+ .map_err(|err| { eprintln!("control socket {:?} write response error: {}", path2, err); })
+ })
+ .select(abort_future)
+ .then(move |_| { Ok(()) })
+ )
});
let abort_future = super::last_worker_future().map_err(|_| {});