From 3dceb9b3046713f3f036f67e0b1d190f530b84d7 Mon Sep 17 00:00:00 2001 From: Dietmar Maurer Date: Sat, 27 Apr 2019 10:56:49 +0200 Subject: [PATCH] src/tools/broadcast_future.rs: add new constructor new_oneshot() To simplify usage. Also added a test case. --- Makefile | 3 ++- src/tools/broadcast_future.rs | 47 +++++++++++++++++++++++++++++++++++ 2 files changed, 49 insertions(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 6aed142c..405e9ce9 100644 --- a/Makefile +++ b/Makefile @@ -47,7 +47,8 @@ $(SUBDIRS): $(MAKE) -C $@ test: - cargo test $(CARGO_BUILD_ARGS) + cargo test test_broadcast_future + #cargo test $(CARGO_BUILD_ARGS) doc: cargo doc --no-deps $(CARGO_BUILD_ARGS) diff --git a/src/tools/broadcast_future.rs b/src/tools/broadcast_future.rs index 917c343e..87e19f0f 100644 --- a/src/tools/broadcast_future.rs +++ b/src/tools/broadcast_future.rs @@ -29,6 +29,16 @@ impl BroadcastFuture { Self { inner: Arc::new(Mutex::new(data)) } } + /// Creates a new instance with a oneshot channel as trigger + pub fn new_oneshot() -> (Self, oneshot::Sender>) { + + let (tx, rx) = oneshot::channel::>(); + let rx = rx.map_err(Error::from).flatten(); + let test = Box::new(rx); + + (Self::new(test), tx) + } + fn update(inner: Arc>>, result: Result) { let mut data = inner.lock().unwrap(); @@ -84,3 +94,40 @@ impl BroadcastFuture { futures::future::lazy(move || { Self::spawn(inner2) }) } } + +#[test] +fn test_broadcast_future() { + use std::sync::atomic::{AtomicUsize, Ordering}; + + static CHECKSUM: AtomicUsize = AtomicUsize::new(0); + + let (sender, trigger) = BroadcastFuture::new_oneshot(); + + let receiver1 = sender.listen() + .and_then(|res| { + CHECKSUM.fetch_add(res, Ordering::SeqCst); + Ok(()) + }) + .map_err(|err| { panic!("got errror {}", err); }); + + let receiver2 = sender.listen() + .and_then(|res| { + CHECKSUM.fetch_add(res*2, Ordering::SeqCst); + Ok(()) + }) + .map_err(|err| { panic!("got errror {}", err); }); + + tokio::run(futures::future::lazy(move || { + + tokio::spawn(receiver1); + tokio::spawn(receiver2); + + trigger.send(Ok(1)).unwrap(); + + Ok(()) + })); + + let result = CHECKSUM.load(Ordering::SeqCst); + + assert!(result == 3); +} -- 2.39.5