]> git.proxmox.com Git - proxmox-backup.git/blob - src/tools/broadcast_future.rs
917c343ed396e193261d0e147c0786fe9a259a06
[proxmox-backup.git] / src / tools / broadcast_future.rs
1 use failure::*;
2 use std::sync::{Mutex, Arc};
3
4 use futures::*;
5 use tokio::sync::oneshot;
6
7 struct BroadcastData<T> {
8 result: Option<Result<T, String>>,
9 listeners: Vec<oneshot::Sender<Result<T, Error>>>,
10 source: Option<Box<Future<Item=T, Error=Error> + Send >>,
11 }
12
13 /// Broadcast future results to registered listeners
14 pub struct BroadcastFuture<T> {
15 inner: Arc<Mutex<BroadcastData<T>>>,
16 }
17
18 impl <T: Clone + Send + 'static> BroadcastFuture<T> {
19
20 /// Create instance for specified source future.
21 ///
22 /// The result of the future is sent to all registered listeners.
23 pub fn new(source: Box<Future<Item=T, Error=Error> + Send>) -> Self {
24 let data = BroadcastData {
25 result: None,
26 listeners: vec![],
27 source: Some(source),
28 };
29 Self { inner: Arc::new(Mutex::new(data)) }
30 }
31
32 fn update(inner: Arc<Mutex<BroadcastData<T>>>, result: Result<T, String>) {
33 let mut data = inner.lock().unwrap();
34
35 data.result = Some(result.clone());
36
37 loop {
38 match data.listeners.pop() {
39 None => { break; },
40 Some(ch) => {
41 match &result {
42 Ok(result) => { let _ = ch.send(Ok(result.clone())); },
43 Err(err) => { let _ = ch.send(Err(format_err!("{}", err))); },
44 }
45 },
46 }
47 }
48 }
49
50 fn spawn(inner: Arc<Mutex<BroadcastData<T>>>) -> impl Future<Item=T, Error=Error> {
51
52 let mut data = inner.lock().unwrap();
53
54 match &data.result {
55 None => {},
56 Some(Ok(result)) => return futures::future::Either::A(futures::future::ok(result.clone())),
57 Some(Err(err)) => return futures::future::Either::A(futures::future::err(format_err!("{}", err))),
58 }
59
60 let (tx, rx) = oneshot::channel::<Result<T, Error>>();
61
62 data.listeners.push(tx);
63
64 if let Some(source) = data.source.take() {
65
66 let inner1 = inner.clone();
67
68 let task = source.then(move |value| {
69 match value {
70 Ok(value) => Self::update(inner1, Ok(value.clone())),
71 Err(err) => Self::update(inner1, Err(err.to_string())),
72 }
73 Ok(())
74 });
75 tokio::spawn(task);
76 }
77
78 futures::future::Either::B(rx.map_err(Error::from).and_then(|result| { result }))
79 }
80
81 /// Register a listener
82 pub fn listen(&self) -> impl Future<Item=T, Error=Error> {
83 let inner2 = self.inner.clone();
84 futures::future::lazy(move || { Self::spawn(inner2) })
85 }
86 }