]>
Commit | Line | Data |
---|---|---|
382609b0 WB |
1 | //! Provides utilities to deal with futures, such as a `Cancellable` future. |
2 | ||
e668912a WB |
3 | use std::future::Future; |
4 | use std::pin::Pin; | |
382609b0 | 5 | use std::sync::{Arc, Mutex}; |
e668912a | 6 | use std::task::{Context, Poll}; |
382609b0 WB |
7 | |
8 | use failure::Error; | |
e668912a | 9 | use futures::future::FutureExt; |
db0cb9ce | 10 | use tokio::sync::oneshot; |
382609b0 WB |
11 | |
12 | /// Make a future cancellable. | |
13 | /// | |
14 | /// This simply performs a `select()` on the future and something waiting for a signal. If the | |
15 | /// future finishes successfully, it yields `Some(T::Item)`. If it was cancelled, it'll yield | |
16 | /// `None`. | |
17 | /// | |
18 | /// In order to cancel the future, a `Canceller` is used. | |
19 | /// | |
20 | /// ```no_run | |
e668912a | 21 | /// # use std::future::Future; |
382609b0 | 22 | /// # use failure::Error; |
e668912a | 23 | /// # use futures::future::FutureExt; |
7fb49397 WB |
24 | /// # use proxmox_backup::tools::futures::Cancellable; |
25 | /// # fn doc<T>(future: T) -> Result<(), Error> | |
26 | /// # where | |
e668912a | 27 | /// # T: Future<Output = i32> + Unpin + Send + Sync + 'static, |
7fb49397 WB |
28 | /// # { |
29 | /// let (future, canceller) = Cancellable::new(future)?; | |
e668912a | 30 | /// tokio::spawn(future.map(|res| { |
7fb49397 WB |
31 | /// match res { |
32 | /// Some(value) => println!("Future finished with {}", value), | |
33 | /// None => println!("Future was cancelled"), | |
34 | /// } | |
7fb49397 | 35 | /// })); |
382609b0 WB |
36 | /// // Do something |
37 | /// canceller.cancel(); | |
7fb49397 | 38 | /// # Ok(()) |
382609b0 WB |
39 | /// # } |
40 | /// ``` | |
e668912a | 41 | pub struct Cancellable<T: Future + Unpin> { |
382609b0 WB |
42 | /// Our core: we're waiting on a future, on on a lock. The cancel method just unlocks the |
43 | /// lock, so that our LockFuture finishes. | |
db0cb9ce | 44 | inner: futures::future::Select<T, oneshot::Receiver<()>>, |
382609b0 WB |
45 | |
46 | /// When this future is created, this holds a guard. When a `Canceller` wants to cancel the | |
47 | /// future, it'll drop this guard, causing our inner future to resolve to `None`. | |
db0cb9ce | 48 | sender: Arc<Mutex<Option<oneshot::Sender<()>>>>, |
382609b0 WB |
49 | } |
50 | ||
51 | /// Reference to a cancellable future. Multiple instances may exist simultaneously. | |
52 | /// | |
53 | /// This allows cancelling another future. If the future already finished, nothing happens. | |
390e83c9 WB |
54 | /// |
55 | /// This can be cloned to be used in multiple places. | |
382609b0 | 56 | #[derive(Clone)] |
db0cb9ce | 57 | pub struct Canceller(Arc<Mutex<Option<oneshot::Sender<()>>>>); |
382609b0 WB |
58 | |
59 | impl Canceller { | |
60 | /// Cancel the associated future. | |
61 | /// | |
62 | /// This does nothing if the future already finished successfully. | |
63 | pub fn cancel(&self) { | |
db0cb9ce | 64 | let _ = self.0.lock().unwrap().take().unwrap().send(()); |
382609b0 WB |
65 | } |
66 | } | |
67 | ||
e668912a | 68 | impl<T: Future + Unpin> Cancellable<T> { |
382609b0 WB |
69 | /// Make a future cancellable. |
70 | /// | |
71 | /// Returns a future and a `Canceller` which can be cloned and used later to cancel the future. | |
72 | pub fn new(inner: T) -> Result<(Self, Canceller), Error> { | |
db0cb9ce WB |
73 | // we don't even need to store the mutex... |
74 | let (tx, rx) = oneshot::channel(); | |
382609b0 | 75 | let this = Self { |
db0cb9ce WB |
76 | inner: futures::future::select(inner, rx), |
77 | sender: Arc::new(Mutex::new(Some(tx))), | |
382609b0 | 78 | }; |
db0cb9ce | 79 | |
382609b0 WB |
80 | let canceller = this.canceller(); |
81 | Ok((this, canceller)) | |
82 | } | |
83 | ||
390e83c9 | 84 | /// Create another `Canceller` for this future. |
382609b0 | 85 | pub fn canceller(&self) -> Canceller { |
db0cb9ce | 86 | Canceller(Arc::clone(&self.sender)) |
382609b0 WB |
87 | } |
88 | } | |
89 | ||
90 | /// Make a future cancellable. | |
91 | /// | |
92 | /// This is a shortcut for `Cancellable::new` | |
e668912a | 93 | pub fn cancellable<T: Future + Unpin>(future: T) -> Result<(Cancellable<T>, Canceller), Error> { |
382609b0 WB |
94 | Cancellable::new(future) |
95 | } | |
96 | ||
e668912a WB |
97 | impl<T: Future + Unpin> Future for Cancellable<T> { |
98 | type Output = Option<<T as Future>::Output>; | |
99 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { | |
382609b0 | 100 | use futures::future::Either; |
e668912a WB |
101 | match self.inner.poll_unpin(cx) { |
102 | Poll::Ready(Either::Left((output, _))) => Poll::Ready(Some(output)), | |
103 | Poll::Ready(Either::Right(_)) => Poll::Ready(None), | |
104 | Poll::Pending => Poll::Pending, | |
382609b0 WB |
105 | } |
106 | } | |
107 | } |