]> git.proxmox.com Git - proxmox-backup.git/blame - src/tools/futures.rs
update a chunk of stuff to the hyper release
[proxmox-backup.git] / src / tools / futures.rs
CommitLineData
382609b0
WB
1//! Provides utilities to deal with futures, such as a `Cancellable` future.
2
e668912a
WB
3use std::future::Future;
4use std::pin::Pin;
382609b0 5use std::sync::{Arc, Mutex};
e668912a 6use std::task::{Context, Poll};
382609b0
WB
7
8use failure::Error;
e668912a 9use futures::future::FutureExt;
db0cb9ce 10use tokio::sync::oneshot;
382609b0
WB
11
12/// Make a future cancellable.
13///
14/// This simply performs a `select()` on the future and something waiting for a signal. If the
15/// future finishes successfully, it yields `Some(T::Item)`. If it was cancelled, it'll yield
16/// `None`.
17///
18/// In order to cancel the future, a `Canceller` is used.
19///
20/// ```no_run
e668912a 21/// # use std::future::Future;
382609b0 22/// # use failure::Error;
e668912a 23/// # use futures::future::FutureExt;
7fb49397
WB
24/// # use proxmox_backup::tools::futures::Cancellable;
25/// # fn doc<T>(future: T) -> Result<(), Error>
26/// # where
e668912a 27/// # T: Future<Output = i32> + Unpin + Send + Sync + 'static,
7fb49397
WB
28/// # {
29/// let (future, canceller) = Cancellable::new(future)?;
e668912a 30/// tokio::spawn(future.map(|res| {
7fb49397
WB
31/// match res {
32/// Some(value) => println!("Future finished with {}", value),
33/// None => println!("Future was cancelled"),
34/// }
7fb49397 35/// }));
382609b0
WB
36/// // Do something
37/// canceller.cancel();
7fb49397 38/// # Ok(())
382609b0
WB
39/// # }
40/// ```
e668912a 41pub struct Cancellable<T: Future + Unpin> {
382609b0
WB
42 /// Our core: we're waiting on a future, on on a lock. The cancel method just unlocks the
43 /// lock, so that our LockFuture finishes.
db0cb9ce 44 inner: futures::future::Select<T, oneshot::Receiver<()>>,
382609b0
WB
45
46 /// When this future is created, this holds a guard. When a `Canceller` wants to cancel the
47 /// future, it'll drop this guard, causing our inner future to resolve to `None`.
db0cb9ce 48 sender: Arc<Mutex<Option<oneshot::Sender<()>>>>,
382609b0
WB
49}
50
51/// Reference to a cancellable future. Multiple instances may exist simultaneously.
52///
53/// This allows cancelling another future. If the future already finished, nothing happens.
390e83c9
WB
54///
55/// This can be cloned to be used in multiple places.
382609b0 56#[derive(Clone)]
db0cb9ce 57pub struct Canceller(Arc<Mutex<Option<oneshot::Sender<()>>>>);
382609b0
WB
58
59impl Canceller {
60 /// Cancel the associated future.
61 ///
62 /// This does nothing if the future already finished successfully.
63 pub fn cancel(&self) {
db0cb9ce 64 let _ = self.0.lock().unwrap().take().unwrap().send(());
382609b0
WB
65 }
66}
67
e668912a 68impl<T: Future + Unpin> Cancellable<T> {
382609b0
WB
69 /// Make a future cancellable.
70 ///
71 /// Returns a future and a `Canceller` which can be cloned and used later to cancel the future.
72 pub fn new(inner: T) -> Result<(Self, Canceller), Error> {
db0cb9ce
WB
73 // we don't even need to store the mutex...
74 let (tx, rx) = oneshot::channel();
382609b0 75 let this = Self {
db0cb9ce
WB
76 inner: futures::future::select(inner, rx),
77 sender: Arc::new(Mutex::new(Some(tx))),
382609b0 78 };
db0cb9ce 79
382609b0
WB
80 let canceller = this.canceller();
81 Ok((this, canceller))
82 }
83
390e83c9 84 /// Create another `Canceller` for this future.
382609b0 85 pub fn canceller(&self) -> Canceller {
db0cb9ce 86 Canceller(Arc::clone(&self.sender))
382609b0
WB
87 }
88}
89
90/// Make a future cancellable.
91///
92/// This is a shortcut for `Cancellable::new`
e668912a 93pub fn cancellable<T: Future + Unpin>(future: T) -> Result<(Cancellable<T>, Canceller), Error> {
382609b0
WB
94 Cancellable::new(future)
95}
96
e668912a
WB
97impl<T: Future + Unpin> Future for Cancellable<T> {
98 type Output = Option<<T as Future>::Output>;
99 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
382609b0 100 use futures::future::Either;
e668912a
WB
101 match self.inner.poll_unpin(cx) {
102 Poll::Ready(Either::Left((output, _))) => Poll::Ready(Some(output)),
103 Poll::Ready(Either::Right(_)) => Poll::Ready(None),
104 Poll::Pending => Poll::Pending,
382609b0
WB
105 }
106 }
107}