]>
Commit | Line | Data |
---|---|---|
1 | //! Provides utilities to deal with futures, such as a `Cancellable` future. | |
2 | ||
3 | use std::future::Future; | |
4 | use std::pin::Pin; | |
5 | use std::sync::{Arc, Mutex}; | |
6 | use std::task::{Context, Poll}; | |
7 | ||
8 | use failure::Error; | |
9 | use futures::future::FutureExt; | |
10 | use tokio::sync::oneshot; | |
11 | ||
12 | /// Make a future cancellable. | |
13 | /// | |
14 | /// This simply performs a `select()` on the future and something waiting for a signal. If the | |
15 | /// future finishes successfully, it yields `Some(T::Item)`. If it was cancelled, it'll yield | |
16 | /// `None`. | |
17 | /// | |
18 | /// In order to cancel the future, a `Canceller` is used. | |
19 | /// | |
20 | /// ```no_run | |
21 | /// # use std::future::Future; | |
22 | /// # use failure::Error; | |
23 | /// # use futures::future::FutureExt; | |
24 | /// # use proxmox_backup::tools::futures::Cancellable; | |
25 | /// # fn doc<T>(future: T) -> Result<(), Error> | |
26 | /// # where | |
27 | /// # T: Future<Output = i32> + Unpin + Send + Sync + 'static, | |
28 | /// # { | |
29 | /// let (future, canceller) = Cancellable::new(future)?; | |
30 | /// tokio::spawn(future.map(|res| { | |
31 | /// match res { | |
32 | /// Some(value) => println!("Future finished with {}", value), | |
33 | /// None => println!("Future was cancelled"), | |
34 | /// } | |
35 | /// })); | |
36 | /// // Do something | |
37 | /// canceller.cancel(); | |
38 | /// # Ok(()) | |
39 | /// # } | |
40 | /// ``` | |
41 | pub struct Cancellable<T: Future + Unpin> { | |
42 | /// Our core: we're waiting on a future, on on a lock. The cancel method just unlocks the | |
43 | /// lock, so that our LockFuture finishes. | |
44 | inner: futures::future::Select<T, oneshot::Receiver<()>>, | |
45 | ||
46 | /// When this future is created, this holds a guard. When a `Canceller` wants to cancel the | |
47 | /// future, it'll drop this guard, causing our inner future to resolve to `None`. | |
48 | sender: Arc<Mutex<Option<oneshot::Sender<()>>>>, | |
49 | } | |
50 | ||
51 | /// Reference to a cancellable future. Multiple instances may exist simultaneously. | |
52 | /// | |
53 | /// This allows cancelling another future. If the future already finished, nothing happens. | |
54 | /// | |
55 | /// This can be cloned to be used in multiple places. | |
56 | #[derive(Clone)] | |
57 | pub struct Canceller(Arc<Mutex<Option<oneshot::Sender<()>>>>); | |
58 | ||
59 | impl Canceller { | |
60 | /// Cancel the associated future. | |
61 | /// | |
62 | /// This does nothing if the future already finished successfully. | |
63 | pub fn cancel(&self) { | |
64 | let _ = self.0.lock().unwrap().take().unwrap().send(()); | |
65 | } | |
66 | } | |
67 | ||
68 | impl<T: Future + Unpin> Cancellable<T> { | |
69 | /// Make a future cancellable. | |
70 | /// | |
71 | /// Returns a future and a `Canceller` which can be cloned and used later to cancel the future. | |
72 | pub fn new(inner: T) -> Result<(Self, Canceller), Error> { | |
73 | // we don't even need to store the mutex... | |
74 | let (tx, rx) = oneshot::channel(); | |
75 | let this = Self { | |
76 | inner: futures::future::select(inner, rx), | |
77 | sender: Arc::new(Mutex::new(Some(tx))), | |
78 | }; | |
79 | ||
80 | let canceller = this.canceller(); | |
81 | Ok((this, canceller)) | |
82 | } | |
83 | ||
84 | /// Create another `Canceller` for this future. | |
85 | pub fn canceller(&self) -> Canceller { | |
86 | Canceller(Arc::clone(&self.sender)) | |
87 | } | |
88 | } | |
89 | ||
90 | /// Make a future cancellable. | |
91 | /// | |
92 | /// This is a shortcut for `Cancellable::new` | |
93 | pub fn cancellable<T: Future + Unpin>(future: T) -> Result<(Cancellable<T>, Canceller), Error> { | |
94 | Cancellable::new(future) | |
95 | } | |
96 | ||
97 | impl<T: Future + Unpin> Future for Cancellable<T> { | |
98 | type Output = Option<<T as Future>::Output>; | |
99 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { | |
100 | use futures::future::Either; | |
101 | match self.inner.poll_unpin(cx) { | |
102 | Poll::Ready(Either::Left((output, _))) => Poll::Ready(Some(output)), | |
103 | Poll::Ready(Either::Right(_)) => Poll::Ready(None), | |
104 | Poll::Pending => Poll::Pending, | |
105 | } | |
106 | } | |
107 | } |