]> git.proxmox.com Git - proxmox-backup.git/blob - src/tools/futures.rs
19cd2a166cb3d6b8dd30a843062005f6679bf6d0
[proxmox-backup.git] / src / tools / futures.rs
1 //! Provides utilities to deal with futures, such as a `Cancellable` future.
2
3 use std::future::Future;
4 use std::pin::Pin;
5 use std::sync::{Arc, Mutex};
6 use std::task::{Context, Poll};
7
8 use failure::Error;
9 use futures::future::FutureExt;
10
11 use crate::tools::async_mutex::{AsyncLockGuard, AsyncMutex, LockFuture};
12
13 /// Make a future cancellable.
14 ///
15 /// This simply performs a `select()` on the future and something waiting for a signal. If the
16 /// future finishes successfully, it yields `Some(T::Item)`. If it was cancelled, it'll yield
17 /// `None`.
18 ///
19 /// In order to cancel the future, a `Canceller` is used.
20 ///
21 /// ```no_run
22 /// # use std::future::Future;
23 /// # use failure::Error;
24 /// # use futures::future::FutureExt;
25 /// # use proxmox_backup::tools::futures::Cancellable;
26 /// # fn doc<T>(future: T) -> Result<(), Error>
27 /// # where
28 /// # T: Future<Output = i32> + Unpin + Send + Sync + 'static,
29 /// # {
30 /// let (future, canceller) = Cancellable::new(future)?;
31 /// tokio::spawn(future.map(|res| {
32 /// match res {
33 /// Some(value) => println!("Future finished with {}", value),
34 /// None => println!("Future was cancelled"),
35 /// }
36 /// }));
37 /// // Do something
38 /// canceller.cancel();
39 /// # Ok(())
40 /// # }
41 /// ```
42 pub struct Cancellable<T: Future + Unpin> {
43 /// Our core: we're waiting on a future, on on a lock. The cancel method just unlocks the
44 /// lock, so that our LockFuture finishes.
45 inner: futures::future::Select<T, LockFuture<()>>,
46
47 /// When this future is created, this holds a guard. When a `Canceller` wants to cancel the
48 /// future, it'll drop this guard, causing our inner future to resolve to `None`.
49 guard: Arc<Mutex<Option<AsyncLockGuard<()>>>>,
50 }
51
52 /// Reference to a cancellable future. Multiple instances may exist simultaneously.
53 ///
54 /// This allows cancelling another future. If the future already finished, nothing happens.
55 ///
56 /// This can be cloned to be used in multiple places.
57 #[derive(Clone)]
58 pub struct Canceller(Arc<Mutex<Option<AsyncLockGuard<()>>>>);
59
60 impl Canceller {
61 /// Cancel the associated future.
62 ///
63 /// This does nothing if the future already finished successfully.
64 pub fn cancel(&self) {
65 *self.0.lock().unwrap() = None;
66 }
67 }
68
69 impl<T: Future + Unpin> Cancellable<T> {
70 /// Make a future cancellable.
71 ///
72 /// Returns a future and a `Canceller` which can be cloned and used later to cancel the future.
73 pub fn new(inner: T) -> Result<(Self, Canceller), Error> {
74 // we don't even need to sture the mutex...
75 let (mutex, guard) = AsyncMutex::new_locked(())?;
76 let this = Self {
77 inner: futures::future::select(inner, mutex.lock()),
78 guard: Arc::new(Mutex::new(Some(guard))),
79 };
80 let canceller = this.canceller();
81 Ok((this, canceller))
82 }
83
84 /// Create another `Canceller` for this future.
85 pub fn canceller(&self) -> Canceller {
86 Canceller(self.guard.clone())
87 }
88 }
89
90 /// Make a future cancellable.
91 ///
92 /// This is a shortcut for `Cancellable::new`
93 pub fn cancellable<T: Future + Unpin>(future: T) -> Result<(Cancellable<T>, Canceller), Error> {
94 Cancellable::new(future)
95 }
96
97 impl<T: Future + Unpin> Future for Cancellable<T> {
98 type Output = Option<<T as Future>::Output>;
99 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
100 use futures::future::Either;
101 match self.inner.poll_unpin(cx) {
102 Poll::Ready(Either::Left((output, _))) => Poll::Ready(Some(output)),
103 Poll::Ready(Either::Right(_)) => Poll::Ready(None),
104 Poll::Pending => Poll::Pending,
105 }
106 }
107 }