]> git.proxmox.com Git - proxmox-backup.git/commitdiff
src/server/command_socket.rs: correctly handle/spawn handle parallel connections
authorDietmar Maurer <dietmar@proxmox.com>
Wed, 10 Apr 2019 09:05:00 +0000 (11:05 +0200)
committerDietmar Maurer <dietmar@proxmox.com>
Wed, 10 Apr 2019 09:05:00 +0000 (11:05 +0200)
src/server/command_socket.rs

index 814d160b31bae4fee7dddc4ece8ce534fb53f684..92a1322cd97a49861bc9a50e659fc49ea71b3881 100644 (file)
@@ -18,7 +18,6 @@ pub fn create_control_socket<P, F>(path: P, f: F) -> Result<impl Future<Item=(),
           F: Send + Sync +'static + Fn(Value) -> Result<Value, Error>,
 {
     let path: PathBuf = path.into();
-    let path1: PathBuf = path.clone();
 
     let socket = UnixListener::bind(&path)?;
 
@@ -35,29 +34,34 @@ pub fn create_control_socket<P, F>(path: P, f: F) -> Result<impl Future<Item=(),
             let path = path3.clone();
             let path2 = path3.clone();
 
-            tokio::io::lines(std::io::BufReader::new(rx))
-                .map_err(move |err| { eprintln!("control socket {:?} read error: {}", path, err); })
-                .and_then(move |cmd| {
-                    let res = try_block!({
-                        let param = match cmd.parse::<Value>() {
-                            Ok(p) => p,
-                            Err(err) => bail!("unable to parse json value - {}", err),
-                        };
+            let abort_future = super::last_worker_future().map_err(|_| {});
 
-                        f1(param)
-                    });
+            tokio::spawn(
+                tokio::io::lines(std::io::BufReader::new(rx))
+                    .map_err(move |err| { eprintln!("control socket {:?} read error: {}", path, err); })
+                    .and_then(move |cmd| {
+                        let res = try_block!({
+                            let param = match cmd.parse::<Value>() {
+                                Ok(p) => p,
+                                Err(err) => bail!("unable to parse json value - {}", err),
+                            };
 
-                    let resp = match res {
-                        Ok(v) => format!("OK: {}\n", v),
-                        Err(err) => format!("ERROR: {}\n", err),
-                    };
-                    Ok(resp)
-                })
-                .for_each(move |resp| {
-                    tx.write_all(resp.as_bytes())
-                        .map_err(|err| { eprintln!("control socket {:?} write response error: {}", path2, err); })
-                })
+                            f1(param)
+                        });
 
+                        let resp = match res {
+                            Ok(v) => format!("OK: {}\n", v),
+                            Err(err) => format!("ERROR: {}\n", err),
+                        };
+                        Ok(resp)
+                    })
+                    .for_each(move |resp| {
+                        tx.write_all(resp.as_bytes())
+                            .map_err(|err| { eprintln!("control socket {:?} write response error: {}", path2, err); })
+                    })
+                    .select(abort_future)
+                    .then(move |_| { Ok(()) })
+            )
         });
 
     let abort_future = super::last_worker_future().map_err(|_| {});