]> git.proxmox.com Git - proxmox-backup.git/commitdiff
src/tools/broadcast_future.rs: add new constructor new_oneshot()
authorDietmar Maurer <dietmar@proxmox.com>
Sat, 27 Apr 2019 08:56:49 +0000 (10:56 +0200)
committerDietmar Maurer <dietmar@proxmox.com>
Sat, 27 Apr 2019 08:56:49 +0000 (10:56 +0200)
To simplify usage. Also added a test case.

Makefile
src/tools/broadcast_future.rs

index 6aed142c38e52d84737aacc7aecd1d28cabdd057..405e9ce9aca194d0061cf2653988e77525b7d111 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -47,7 +47,8 @@ $(SUBDIRS):
        $(MAKE) -C $@
 
 test:
-       cargo test $(CARGO_BUILD_ARGS)
+       cargo test test_broadcast_future
+       #cargo test $(CARGO_BUILD_ARGS)
 
 doc:
        cargo doc --no-deps $(CARGO_BUILD_ARGS)
index 917c343ed396e193261d0e147c0786fe9a259a06..87e19f0f6e2cdf20c4f4524ab90ac14274782a98 100644 (file)
@@ -29,6 +29,16 @@ impl <T: Clone + Send + 'static> BroadcastFuture<T> {
         Self { inner: Arc::new(Mutex::new(data)) }
     }
 
+    /// Creates a new instance with a oneshot channel as trigger
+    pub fn new_oneshot() -> (Self, oneshot::Sender<Result<T, Error>>) {
+
+        let (tx, rx) = oneshot::channel::<Result<T, Error>>();
+        let rx = rx.map_err(Error::from).flatten();
+        let test = Box::new(rx);
+
+        (Self::new(test), tx)
+    }
+
     fn update(inner: Arc<Mutex<BroadcastData<T>>>, result: Result<T, String>) {
         let mut data = inner.lock().unwrap();
 
@@ -84,3 +94,40 @@ impl <T: Clone + Send + 'static> BroadcastFuture<T> {
         futures::future::lazy(move || { Self::spawn(inner2) })
     }
 }
+
+#[test]
+fn test_broadcast_future() {
+    use std::sync::atomic::{AtomicUsize, Ordering};
+
+    static CHECKSUM: AtomicUsize  = AtomicUsize::new(0);
+
+    let (sender, trigger) = BroadcastFuture::new_oneshot();
+
+    let receiver1 = sender.listen()
+        .and_then(|res| {
+            CHECKSUM.fetch_add(res, Ordering::SeqCst);
+            Ok(())
+        })
+        .map_err(|err| { panic!("got errror {}", err); });
+
+    let receiver2 = sender.listen()
+        .and_then(|res| {
+            CHECKSUM.fetch_add(res*2, Ordering::SeqCst);
+            Ok(())
+        })
+        .map_err(|err| { panic!("got errror {}", err); });
+
+    tokio::run(futures::future::lazy(move || {
+
+        tokio::spawn(receiver1);
+        tokio::spawn(receiver2);
+
+        trigger.send(Ok(1)).unwrap();
+
+        Ok(())
+    }));
+
+    let result = CHECKSUM.load(Ordering::SeqCst);
+
+    assert!(result == 3);
+}