]> git.proxmox.com Git - proxmox-backup.git/blob - src/tools/futures.rs
fix broadcast_future test case
[proxmox-backup.git] / src / tools / futures.rs
1 //! Provides utilities to deal with futures, such as a `Cancellable` future.
2
3 use std::future::Future;
4 use std::pin::Pin;
5 use std::sync::{Arc, Mutex};
6 use std::task::{Context, Poll};
7
8 use failure::Error;
9 use futures::future::FutureExt;
10 use tokio::sync::oneshot;
11
12 /// Make a future cancellable.
13 ///
14 /// This simply performs a `select()` on the future and something waiting for a signal. If the
15 /// future finishes successfully, it yields `Some(T::Item)`. If it was cancelled, it'll yield
16 /// `None`.
17 ///
18 /// In order to cancel the future, a `Canceller` is used.
19 ///
20 /// ```no_run
21 /// # use std::future::Future;
22 /// # use failure::Error;
23 /// # use futures::future::FutureExt;
24 /// # use proxmox_backup::tools::futures::Cancellable;
25 /// # fn doc<T>(future: T) -> Result<(), Error>
26 /// # where
27 /// # T: Future<Output = i32> + Unpin + Send + Sync + 'static,
28 /// # {
29 /// let (future, canceller) = Cancellable::new(future)?;
30 /// tokio::spawn(future.map(|res| {
31 /// match res {
32 /// Some(value) => println!("Future finished with {}", value),
33 /// None => println!("Future was cancelled"),
34 /// }
35 /// }));
36 /// // Do something
37 /// canceller.cancel();
38 /// # Ok(())
39 /// # }
40 /// ```
41 pub struct Cancellable<T: Future + Unpin> {
42 /// Our core: we're waiting on a future, on on a lock. The cancel method just unlocks the
43 /// lock, so that our LockFuture finishes.
44 inner: futures::future::Select<T, oneshot::Receiver<()>>,
45
46 /// When this future is created, this holds a guard. When a `Canceller` wants to cancel the
47 /// future, it'll drop this guard, causing our inner future to resolve to `None`.
48 sender: Arc<Mutex<Option<oneshot::Sender<()>>>>,
49 }
50
51 /// Reference to a cancellable future. Multiple instances may exist simultaneously.
52 ///
53 /// This allows cancelling another future. If the future already finished, nothing happens.
54 ///
55 /// This can be cloned to be used in multiple places.
56 #[derive(Clone)]
57 pub struct Canceller(Arc<Mutex<Option<oneshot::Sender<()>>>>);
58
59 impl Canceller {
60 /// Cancel the associated future.
61 ///
62 /// This does nothing if the future already finished successfully.
63 pub fn cancel(&self) {
64 if let Some(sender) = self.0.lock().unwrap().take() {
65 let _ = sender.send(());
66 }
67 }
68 }
69
70 impl<T: Future + Unpin> Cancellable<T> {
71 /// Make a future cancellable.
72 ///
73 /// Returns a future and a `Canceller` which can be cloned and used later to cancel the future.
74 pub fn new(inner: T) -> Result<(Self, Canceller), Error> {
75 // we don't even need to store the mutex...
76 let (tx, rx) = oneshot::channel();
77 let this = Self {
78 inner: futures::future::select(inner, rx),
79 sender: Arc::new(Mutex::new(Some(tx))),
80 };
81
82 let canceller = this.canceller();
83 Ok((this, canceller))
84 }
85
86 /// Create another `Canceller` for this future.
87 pub fn canceller(&self) -> Canceller {
88 Canceller(Arc::clone(&self.sender))
89 }
90 }
91
92 /// Make a future cancellable.
93 ///
94 /// This is a shortcut for `Cancellable::new`
95 pub fn cancellable<T: Future + Unpin>(future: T) -> Result<(Cancellable<T>, Canceller), Error> {
96 Cancellable::new(future)
97 }
98
99 impl<T: Future + Unpin> Future for Cancellable<T> {
100 type Output = Option<<T as Future>::Output>;
101 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
102 use futures::future::Either;
103 match self.inner.poll_unpin(cx) {
104 Poll::Ready(Either::Left((output, _))) => Poll::Ready(Some(output)),
105 Poll::Ready(Either::Right(_)) => Poll::Ready(None),
106 Poll::Pending => Poll::Pending,
107 }
108 }
109 }