Self { inner: Arc::new(Mutex::new(data)) }
}
+ /// Creates a new instance with a oneshot channel as trigger
+ pub fn new_oneshot() -> (Self, oneshot::Sender<Result<T, Error>>) {
+
+ let (tx, rx) = oneshot::channel::<Result<T, Error>>();
+ let rx = rx.map_err(Error::from).flatten();
+ let test = Box::new(rx);
+
+ (Self::new(test), tx)
+ }
+
fn update(inner: Arc<Mutex<BroadcastData<T>>>, result: Result<T, String>) {
let mut data = inner.lock().unwrap();
futures::future::lazy(move || { Self::spawn(inner2) })
}
}
+
+#[test]
+fn test_broadcast_future() {
+ use std::sync::atomic::{AtomicUsize, Ordering};
+
+ static CHECKSUM: AtomicUsize = AtomicUsize::new(0);
+
+ let (sender, trigger) = BroadcastFuture::new_oneshot();
+
+ let receiver1 = sender.listen()
+ .and_then(|res| {
+ CHECKSUM.fetch_add(res, Ordering::SeqCst);
+ Ok(())
+ })
+ .map_err(|err| { panic!("got errror {}", err); });
+
+ let receiver2 = sender.listen()
+ .and_then(|res| {
+ CHECKSUM.fetch_add(res*2, Ordering::SeqCst);
+ Ok(())
+ })
+ .map_err(|err| { panic!("got errror {}", err); });
+
+ tokio::run(futures::future::lazy(move || {
+
+ tokio::spawn(receiver1);
+ tokio::spawn(receiver2);
+
+ trigger.send(Ok(1)).unwrap();
+
+ Ok(())
+ }));
+
+ let result = CHECKSUM.load(Ordering::SeqCst);
+
+ assert!(result == 3);
+}