1 //! Provides utilities to deal with futures, such as a `Cancellable` future.
3 use std
::future
::Future
;
5 use std
::sync
::{Arc, Mutex}
;
6 use std
::task
::{Context, Poll}
;
9 use futures
::future
::FutureExt
;
11 use crate::tools
::async_mutex
::{AsyncLockGuard, AsyncMutex, LockFuture}
;
13 /// Make a future cancellable.
15 /// This simply performs a `select()` on the future and something waiting for a signal. If the
16 /// future finishes successfully, it yields `Some(T::Item)`. If it was cancelled, it'll yield
19 /// In order to cancel the future, a `Canceller` is used.
22 /// # use std::future::Future;
23 /// # use failure::Error;
24 /// # use futures::future::FutureExt;
25 /// # use proxmox_backup::tools::futures::Cancellable;
26 /// # fn doc<T>(future: T) -> Result<(), Error>
28 /// # T: Future<Output = i32> + Unpin + Send + Sync + 'static,
30 /// let (future, canceller) = Cancellable::new(future)?;
31 /// tokio::spawn(future.map(|res| {
33 /// Some(value) => println!("Future finished with {}", value),
34 /// None => println!("Future was cancelled"),
38 /// canceller.cancel();
42 pub struct Cancellable
<T
: Future
+ Unpin
> {
43 /// Our core: we're waiting on a future, on on a lock. The cancel method just unlocks the
44 /// lock, so that our LockFuture finishes.
45 inner
: futures
::future
::Select
<T
, LockFuture
<()>>,
47 /// When this future is created, this holds a guard. When a `Canceller` wants to cancel the
48 /// future, it'll drop this guard, causing our inner future to resolve to `None`.
49 guard
: Arc
<Mutex
<Option
<AsyncLockGuard
<()>>>>,
52 /// Reference to a cancellable future. Multiple instances may exist simultaneously.
54 /// This allows cancelling another future. If the future already finished, nothing happens.
56 /// This can be cloned to be used in multiple places.
58 pub struct Canceller(Arc
<Mutex
<Option
<AsyncLockGuard
<()>>>>);
61 /// Cancel the associated future.
63 /// This does nothing if the future already finished successfully.
64 pub fn cancel(&self) {
65 *self.0.lock().unwrap() = None
;
69 impl<T
: Future
+ Unpin
> Cancellable
<T
> {
70 /// Make a future cancellable.
72 /// Returns a future and a `Canceller` which can be cloned and used later to cancel the future.
73 pub fn new(inner
: T
) -> Result
<(Self, Canceller
), Error
> {
74 // we don't even need to sture the mutex...
75 let (mutex
, guard
) = AsyncMutex
::new_locked(())?
;
77 inner
: futures
::future
::select(inner
, mutex
.lock()),
78 guard
: Arc
::new(Mutex
::new(Some(guard
))),
80 let canceller
= this
.canceller();
84 /// Create another `Canceller` for this future.
85 pub fn canceller(&self) -> Canceller
{
86 Canceller(self.guard
.clone())
90 /// Make a future cancellable.
92 /// This is a shortcut for `Cancellable::new`
93 pub fn cancellable
<T
: Future
+ Unpin
>(future
: T
) -> Result
<(Cancellable
<T
>, Canceller
), Error
> {
94 Cancellable
::new(future
)
97 impl<T
: Future
+ Unpin
> Future
for Cancellable
<T
> {
98 type Output
= Option
<<T
as Future
>::Output
>;
99 fn poll(mut self: Pin
<&mut Self>, cx
: &mut Context
) -> Poll
<Self::Output
> {
100 use futures
::future
::Either
;
101 match self.inner
.poll_unpin(cx
) {
102 Poll
::Ready(Either
::Left((output
, _
))) => Poll
::Ready(Some(output
)),
103 Poll
::Ready(Either
::Right(_
)) => Poll
::Ready(None
),
104 Poll
::Pending
=> Poll
::Pending
,