1 use std
::future
::Future
;
3 use std
::sync
::{Arc, Mutex}
;
5 use anyhow
::{format_err, Error}
;
6 use futures
::future
::{FutureExt, TryFutureExt}
;
7 use tokio
::sync
::oneshot
;
9 /// Broadcast results to registered listeners using asnyc oneshot channels
11 pub struct BroadcastData
<T
> {
12 result
: Option
<Result
<T
, String
>>,
13 listeners
: Vec
<oneshot
::Sender
<Result
<T
, Error
>>>,
16 impl <T
: Clone
> BroadcastData
<T
> {
18 pub fn new() -> Self {
25 pub fn notify_listeners(&mut self, result
: Result
<T
, String
>) {
27 self.result
= Some(result
.clone());
30 match self.listeners
.pop() {
34 Ok(result
) => { let _ = ch.send(Ok(result.clone())); }
,
35 Err(err
) => { let _ = ch.send(Err(format_err!("{}
", err))); },
42 pub fn listen(&mut self) -> impl Future<Output = Result<T, Error>> {
43 use futures::future::{ok, Either};
47 Some(Ok(result)) => return Either::Left(ok(result.clone())),
48 Some(Err(err)) => return Either::Left(futures::future::err(format_err!("{}
", err))),
51 let (tx, rx) = oneshot::channel::<Result<T, Error>>();
53 self.listeners.push(tx);
56 .map(|res| match res {
59 Err(e) => Err(Error::from(e)),
65 type SourceFuture<T> = Pin<Box<dyn Future<Output = Result<T, Error>> + Send>>;
67 struct BroadCastFutureBinding<T> {
68 broadcast: BroadcastData<T>,
69 future: Option<SourceFuture<T>>,
72 /// Broadcast future results to registered listeners
73 pub struct BroadcastFuture<T> {
74 inner: Arc<Mutex<BroadCastFutureBinding<T>>>,
77 impl<T: Clone + Send + 'static> BroadcastFuture<T> {
78 /// Create instance for specified source future.
80 /// The result of the future is sent to all registered listeners.
81 pub fn new(source: Box<dyn Future<Output = Result<T, Error>> + Send>) -> Self {
82 let inner = BroadCastFutureBinding {
83 broadcast: BroadcastData::new(),
84 future: Some(Pin::from(source)),
86 Self { inner: Arc::new(Mutex::new(inner)) }
89 /// Creates a new instance with a oneshot channel as trigger
90 pub fn new_oneshot() -> (Self, oneshot::Sender<Result<T, Error>>) {
92 let (tx, rx) = oneshot::channel::<Result<T, Error>>();
95 .and_then(futures::future::ready);
97 (Self::new(Box::new(rx)), tx)
101 inner: Arc<Mutex<BroadCastFutureBinding<T>>>,
102 result: Result<T, String>,
104 let mut data = inner.lock().unwrap();
105 data.broadcast.notify_listeners(result);
108 fn spawn(inner: Arc<Mutex<BroadCastFutureBinding<T>>>) -> impl Future<Output = Result<T, Error>> {
109 let mut data = inner.lock().unwrap();
111 if let Some(source) = data.future.take() {
113 let inner1 = inner.clone();
115 let task = source.map(move |value| {
117 Ok(value) => Self::notify_listeners(inner1, Ok(value)),
118 Err(err) => Self::notify_listeners(inner1, Err(err.to_string())),
124 data.broadcast.listen()
127 /// Register a listener
128 pub fn listen(&self) -> impl Future<Output = Result<T, Error>> {
129 let inner2 = self.inner.clone();
130 async move { Self::spawn(inner2).await }
135 fn test_broadcast_future() {
136 use std::sync::atomic::{AtomicUsize, Ordering};
138 static CHECKSUM: AtomicUsize = AtomicUsize::new(0);
140 let (sender, trigger) = BroadcastFuture::new_oneshot();
142 let receiver1 = sender.listen()
144 CHECKSUM.fetch_add(res, Ordering::SeqCst);
146 .map_err(|err| { panic!("got error {}", err
); })
149 let receiver2
= sender
.listen()
151 CHECKSUM
.fetch_add(res
*2, Ordering
::SeqCst
);
153 .map_err(|err
| { panic!("got error {}
", err); })
156 let rt = tokio::runtime::Runtime::new().unwrap();
157 rt.block_on(async move {
158 let r1 = tokio::spawn(receiver1);
159 let r2 = tokio::spawn(receiver2);
161 trigger.send(Ok(1)).unwrap();
166 let result = CHECKSUM.load(Ordering::SeqCst);
168 assert_eq!(result, 3);
170 // the result stays available until the BroadcastFuture is dropped
171 rt.block_on(sender.listen()
173 CHECKSUM.fetch_add(res*4, Ordering::SeqCst);
175 .map_err(|err| { panic!("got error {}", err
); })
178 let result
= CHECKSUM
.load(Ordering
::SeqCst
);
179 assert_eq
!(result
, 7);