1 use std
::future
::Future
;
3 use std
::sync
::{Arc, Mutex}
;
5 use anyhow
::{format_err, Error}
;
6 use futures
::future
::{FutureExt, TryFutureExt}
;
7 use tokio
::sync
::oneshot
;
9 /// Broadcast results to registered listeners using asnyc oneshot channels
11 pub struct BroadcastData
<T
> {
12 result
: Option
<Result
<T
, String
>>,
13 listeners
: Vec
<oneshot
::Sender
<Result
<T
, Error
>>>,
16 impl <T
: Clone
> BroadcastData
<T
> {
18 pub fn new() -> Self {
25 pub fn notify_listeners(&mut self, result
: Result
<T
, String
>) {
27 self.result
= Some(result
.clone());
30 match self.listeners
.pop() {
34 Ok(result
) => { let _ = ch.send(Ok(result.clone())); }
,
35 Err(err
) => { let _ = ch.send(Err(format_err!("{}
", err))); },
42 pub fn listen(&mut self) -> impl Future<Output = Result<T, Error>> {
43 use futures::future::{ok, Either};
47 Some(Ok(result)) => return Either::Left(ok(result.clone())),
48 Some(Err(err)) => return Either::Left(futures::future::err(format_err!("{}
", err))),
51 let (tx, rx) = oneshot::channel::<Result<T, Error>>();
53 self.listeners.push(tx);
56 .map(|res| match res {
59 Err(e) => Err(Error::from(e)),
65 /// Broadcast future results to registered listeners
66 pub struct BroadcastFuture<T> {
70 Option<Pin<Box<dyn Future<Output = Result<T, Error>> + Send>>>,
75 impl<T: Clone + Send + 'static> BroadcastFuture<T> {
76 /// Create instance for specified source future.
78 /// The result of the future is sent to all registered listeners.
79 pub fn new(source: Box<dyn Future<Output = Result<T, Error>> + Send>) -> Self {
80 Self { inner: Arc::new(Mutex::new((BroadcastData::new(), Some(Pin::from(source))))) }
83 /// Creates a new instance with a oneshot channel as trigger
84 pub fn new_oneshot() -> (Self, oneshot::Sender<Result<T, Error>>) {
86 let (tx, rx) = oneshot::channel::<Result<T, Error>>();
89 .and_then(futures::future::ready);
91 (Self::new(Box::new(rx)), tx)
98 Option<Pin<Box<dyn Future<Output = Result<T, Error>> + Send>>>,
101 result: Result<T, String>,
103 let mut data = inner.lock().unwrap();
104 data.0.notify_listeners(result);
111 Option<Pin<Box<dyn Future<Output = Result<T, Error>> + Send>>>,
114 ) -> impl Future<Output = Result<T, Error>> {
115 let mut data = inner.lock().unwrap();
117 if let Some(source) = data.1.take() {
119 let inner1 = inner.clone();
121 let task = source.map(move |value| {
123 Ok(value) => Self::notify_listeners(inner1, Ok(value)),
124 Err(err) => Self::notify_listeners(inner1, Err(err.to_string())),
133 /// Register a listener
134 pub fn listen(&self) -> impl Future<Output = Result<T, Error>> {
135 let inner2 = self.inner.clone();
136 async move { Self::spawn(inner2).await }
141 fn test_broadcast_future() {
142 use std::sync::atomic::{AtomicUsize, Ordering};
144 static CHECKSUM: AtomicUsize = AtomicUsize::new(0);
146 let (sender, trigger) = BroadcastFuture::new_oneshot();
148 let receiver1 = sender.listen()
150 CHECKSUM.fetch_add(res, Ordering::SeqCst);
152 .map_err(|err| { panic!("got error {}", err
); })
155 let receiver2
= sender
.listen()
157 CHECKSUM
.fetch_add(res
*2, Ordering
::SeqCst
);
159 .map_err(|err
| { panic!("got error {}
", err); })
162 let rt = tokio::runtime::Runtime::new().unwrap();
163 rt.block_on(async move {
164 let r1 = tokio::spawn(receiver1);
165 let r2 = tokio::spawn(receiver2);
167 trigger.send(Ok(1)).unwrap();
172 let result = CHECKSUM.load(Ordering::SeqCst);
174 assert_eq!(result, 3);