2 use std
::sync
::{Mutex, Arc}
;
5 use tokio
::sync
::oneshot
;
7 struct BroadcastData
<T
> {
8 result
: Option
<Result
<T
, String
>>,
9 listeners
: Vec
<oneshot
::Sender
<Result
<T
, Error
>>>,
10 source
: Option
<Box
<Future
<Item
=T
, Error
=Error
> + Send
>>,
13 /// Broadcast future results to registered listeners
14 pub struct BroadcastFuture
<T
> {
15 inner
: Arc
<Mutex
<BroadcastData
<T
>>>,
18 impl <T
: Clone
+ Send
+ '
static> BroadcastFuture
<T
> {
20 /// Create instance for specified source future.
22 /// The result of the future is sent to all registered listeners.
23 pub fn new(source
: Box
<Future
<Item
=T
, Error
=Error
> + Send
>) -> Self {
24 let data
= BroadcastData
{
29 Self { inner: Arc::new(Mutex::new(data)) }
32 fn update(inner
: Arc
<Mutex
<BroadcastData
<T
>>>, result
: Result
<T
, String
>) {
33 let mut data
= inner
.lock().unwrap();
35 data
.result
= Some(result
.clone());
38 match data
.listeners
.pop() {
42 Ok(result
) => { let _ = ch.send(Ok(result.clone())); }
,
43 Err(err
) => { let _ = ch.send(Err(format_err!("{}
", err))); },
50 fn spawn(inner: Arc<Mutex<BroadcastData<T>>>) -> impl Future<Item=T, Error=Error> {
52 let mut data = inner.lock().unwrap();
56 Some(Ok(result)) => return futures::future::Either::A(futures::future::ok(result.clone())),
57 Some(Err(err)) => return futures::future::Either::A(futures::future::err(format_err!("{}
", err))),
60 let (tx, rx) = oneshot::channel::<Result<T, Error>>();
62 data.listeners.push(tx);
64 if let Some(source) = data.source.take() {
66 let inner1 = inner.clone();
68 let task = source.then(move |value| {
70 Ok(value) => Self::update(inner1, Ok(value.clone())),
71 Err(err) => Self::update(inner1, Err(err.to_string())),
78 futures::future::Either::B(rx.map_err(Error::from).and_then(|result| { result }))
81 /// Register a listener
82 pub fn listen(&self) -> impl Future<Item=T, Error=Error> {
83 let inner2 = self.inner.clone();
84 futures::future::lazy(move || { Self::spawn(inner2) })