]>
Commit | Line | Data |
---|---|---|
56e351c9 WB |
1 | use std::future::Future; |
2 | use std::pin::Pin; | |
3 | use std::sync::{Arc, Mutex}; | |
490be29e | 4 | |
f7d4e4b5 | 5 | use anyhow::{format_err, Error}; |
56e351c9 | 6 | use futures::future::{FutureExt, TryFutureExt}; |
490be29e DM |
7 | use tokio::sync::oneshot; |
8 | ||
824b5ee4 | 9 | /// Broadcast results to registered listeners using asnyc oneshot channels |
62ee2eb4 | 10 | #[derive(Default)] |
824b5ee4 | 11 | pub struct BroadcastData<T> { |
490be29e DM |
12 | result: Option<Result<T, String>>, |
13 | listeners: Vec<oneshot::Sender<Result<T, Error>>>, | |
490be29e DM |
14 | } |
15 | ||
824b5ee4 | 16 | impl <T: Clone> BroadcastData<T> { |
490be29e | 17 | |
824b5ee4 DM |
18 | pub fn new() -> Self { |
19 | Self { | |
490be29e DM |
20 | result: None, |
21 | listeners: vec![], | |
824b5ee4 | 22 | } |
3dceb9b3 DM |
23 | } |
24 | ||
824b5ee4 | 25 | pub fn notify_listeners(&mut self, result: Result<T, String>) { |
490be29e | 26 | |
824b5ee4 | 27 | self.result = Some(result.clone()); |
490be29e DM |
28 | |
29 | loop { | |
824b5ee4 | 30 | match self.listeners.pop() { |
490be29e DM |
31 | None => { break; }, |
32 | Some(ch) => { | |
33 | match &result { | |
34 | Ok(result) => { let _ = ch.send(Ok(result.clone())); }, | |
35 | Err(err) => { let _ = ch.send(Err(format_err!("{}", err))); }, | |
36 | } | |
37 | }, | |
38 | } | |
39 | } | |
40 | } | |
41 | ||
56e351c9 WB |
42 | pub fn listen(&mut self) -> impl Future<Output = Result<T, Error>> { |
43 | use futures::future::{ok, Either}; | |
490be29e | 44 | |
824b5ee4 | 45 | match &self.result { |
490be29e | 46 | None => {}, |
56e351c9 WB |
47 | Some(Ok(result)) => return Either::Left(ok(result.clone())), |
48 | Some(Err(err)) => return Either::Left(futures::future::err(format_err!("{}", err))), | |
490be29e DM |
49 | } |
50 | ||
51 | let (tx, rx) = oneshot::channel::<Result<T, Error>>(); | |
52 | ||
824b5ee4 DM |
53 | self.listeners.push(tx); |
54 | ||
56e351c9 WB |
55 | Either::Right(rx |
56 | .map(|res| match res { | |
57 | Ok(Ok(t)) => Ok(t), | |
58 | Ok(Err(e)) => Err(e), | |
59 | Err(e) => Err(Error::from(e)), | |
60 | }) | |
61 | ) | |
824b5ee4 DM |
62 | } |
63 | } | |
64 | ||
65 | /// Broadcast future results to registered listeners | |
66 | pub struct BroadcastFuture<T> { | |
56e351c9 WB |
67 | inner: Arc< |
68 | Mutex<( | |
69 | BroadcastData<T>, | |
70 | Option<Pin<Box<dyn Future<Output = Result<T, Error>> + Send>>>, | |
71 | )>, | |
72 | >, | |
824b5ee4 DM |
73 | } |
74 | ||
56e351c9 | 75 | impl<T: Clone + Send + 'static> BroadcastFuture<T> { |
824b5ee4 DM |
76 | /// Create instance for specified source future. |
77 | /// | |
78 | /// The result of the future is sent to all registered listeners. | |
56e351c9 WB |
79 | pub fn new(source: Box<dyn Future<Output = Result<T, Error>> + Send>) -> Self { |
80 | Self { inner: Arc::new(Mutex::new((BroadcastData::new(), Some(Pin::from(source))))) } | |
824b5ee4 DM |
81 | } |
82 | ||
83 | /// Creates a new instance with a oneshot channel as trigger | |
84 | pub fn new_oneshot() -> (Self, oneshot::Sender<Result<T, Error>>) { | |
85 | ||
86 | let (tx, rx) = oneshot::channel::<Result<T, Error>>(); | |
56e351c9 WB |
87 | let rx = rx |
88 | .map_err(Error::from) | |
62ee2eb4 | 89 | .and_then(futures::future::ready); |
824b5ee4 | 90 | |
56e351c9 | 91 | (Self::new(Box::new(rx)), tx) |
824b5ee4 DM |
92 | } |
93 | ||
56e351c9 WB |
94 | fn notify_listeners( |
95 | inner: Arc< | |
96 | Mutex<( | |
97 | BroadcastData<T>, | |
98 | Option<Pin<Box<dyn Future<Output = Result<T, Error>> + Send>>>, | |
99 | )>, | |
100 | >, | |
101 | result: Result<T, String>, | |
102 | ) { | |
824b5ee4 DM |
103 | let mut data = inner.lock().unwrap(); |
104 | data.0.notify_listeners(result); | |
105 | } | |
106 | ||
56e351c9 WB |
107 | fn spawn( |
108 | inner: Arc< | |
109 | Mutex<( | |
110 | BroadcastData<T>, | |
111 | Option<Pin<Box<dyn Future<Output = Result<T, Error>> + Send>>>, | |
112 | )>, | |
113 | >, | |
114 | ) -> impl Future<Output = Result<T, Error>> { | |
824b5ee4 | 115 | let mut data = inner.lock().unwrap(); |
490be29e | 116 | |
824b5ee4 | 117 | if let Some(source) = data.1.take() { |
490be29e DM |
118 | |
119 | let inner1 = inner.clone(); | |
120 | ||
56e351c9 | 121 | let task = source.map(move |value| { |
490be29e | 122 | match value { |
824b5ee4 DM |
123 | Ok(value) => Self::notify_listeners(inner1, Ok(value.clone())), |
124 | Err(err) => Self::notify_listeners(inner1, Err(err.to_string())), | |
490be29e | 125 | } |
490be29e DM |
126 | }); |
127 | tokio::spawn(task); | |
128 | } | |
129 | ||
824b5ee4 | 130 | data.0.listen() |
490be29e DM |
131 | } |
132 | ||
133 | /// Register a listener | |
56e351c9 | 134 | pub fn listen(&self) -> impl Future<Output = Result<T, Error>> { |
490be29e | 135 | let inner2 = self.inner.clone(); |
56e351c9 | 136 | async move { Self::spawn(inner2).await } |
490be29e DM |
137 | } |
138 | } | |
3dceb9b3 DM |
139 | |
140 | #[test] | |
141 | fn test_broadcast_future() { | |
142 | use std::sync::atomic::{AtomicUsize, Ordering}; | |
143 | ||
144 | static CHECKSUM: AtomicUsize = AtomicUsize::new(0); | |
145 | ||
146 | let (sender, trigger) = BroadcastFuture::new_oneshot(); | |
147 | ||
148 | let receiver1 = sender.listen() | |
56e351c9 | 149 | .map_ok(|res| { |
3dceb9b3 | 150 | CHECKSUM.fetch_add(res, Ordering::SeqCst); |
3dceb9b3 | 151 | }) |
add5861e | 152 | .map_err(|err| { panic!("got error {}", err); }) |
56e351c9 | 153 | .map(|_| ()); |
3dceb9b3 DM |
154 | |
155 | let receiver2 = sender.listen() | |
56e351c9 | 156 | .map_ok(|res| { |
3dceb9b3 | 157 | CHECKSUM.fetch_add(res*2, Ordering::SeqCst); |
3dceb9b3 | 158 | }) |
add5861e | 159 | .map_err(|err| { panic!("got error {}", err); }) |
56e351c9 | 160 | .map(|_| ()); |
3dceb9b3 | 161 | |
f374ba4c | 162 | let mut rt = tokio::runtime::Runtime::new().unwrap(); |
56e351c9 | 163 | rt.block_on(async move { |
8554ac5e WB |
164 | let r1 = tokio::spawn(receiver1); |
165 | let r2 = tokio::spawn(receiver2); | |
3dceb9b3 DM |
166 | |
167 | trigger.send(Ok(1)).unwrap(); | |
8554ac5e WB |
168 | let _ = r1.await; |
169 | let _ = r2.await; | |
56e351c9 | 170 | }); |
3dceb9b3 DM |
171 | |
172 | let result = CHECKSUM.load(Ordering::SeqCst); | |
173 | ||
f374ba4c | 174 | assert_eq!(result, 3); |
3dceb9b3 | 175 | } |