]>
git.proxmox.com Git - proxmox-backup.git/blob - src/tools/futures.rs
1 //! Provides utilities to deal with futures, such as a `Cancellable` future.
3 use std
::future
::Future
;
5 use std
::sync
::{Arc, Mutex}
;
6 use std
::task
::{Context, Poll}
;
9 use futures
::future
::FutureExt
;
10 use tokio
::sync
::oneshot
;
12 /// Make a future cancellable.
14 /// This simply performs a `select()` on the future and something waiting for a signal. If the
15 /// future finishes successfully, it yields `Some(T::Item)`. If it was cancelled, it'll yield
18 /// In order to cancel the future, a `Canceller` is used.
21 /// # use std::future::Future;
22 /// # use failure::Error;
23 /// # use futures::future::FutureExt;
24 /// # use proxmox_backup::tools::futures::Cancellable;
25 /// # fn doc<T>(future: T) -> Result<(), Error>
27 /// # T: Future<Output = i32> + Unpin + Send + Sync + 'static,
29 /// let (future, canceller) = Cancellable::new(future)?;
30 /// tokio::spawn(future.map(|res| {
32 /// Some(value) => println!("Future finished with {}", value),
33 /// None => println!("Future was cancelled"),
37 /// canceller.cancel();
41 pub struct Cancellable
<T
: Future
+ Unpin
> {
42 /// Our core: we're waiting on a future, on on a lock. The cancel method just unlocks the
43 /// lock, so that our LockFuture finishes.
44 inner
: futures
::future
::Select
<T
, oneshot
::Receiver
<()>>,
46 /// When this future is created, this holds a guard. When a `Canceller` wants to cancel the
47 /// future, it'll drop this guard, causing our inner future to resolve to `None`.
48 sender
: Arc
<Mutex
<Option
<oneshot
::Sender
<()>>>>,
51 /// Reference to a cancellable future. Multiple instances may exist simultaneously.
53 /// This allows cancelling another future. If the future already finished, nothing happens.
55 /// This can be cloned to be used in multiple places.
57 pub struct Canceller(Arc
<Mutex
<Option
<oneshot
::Sender
<()>>>>);
60 /// Cancel the associated future.
62 /// This does nothing if the future already finished successfully.
63 pub fn cancel(&self) {
64 if let Some(sender
) = self.0.lock().unwrap().take() {
65 let _
= sender
.send(());
70 impl<T
: Future
+ Unpin
> Cancellable
<T
> {
71 /// Make a future cancellable.
73 /// Returns a future and a `Canceller` which can be cloned and used later to cancel the future.
74 pub fn new(inner
: T
) -> Result
<(Self, Canceller
), Error
> {
75 // we don't even need to store the mutex...
76 let (tx
, rx
) = oneshot
::channel();
78 inner
: futures
::future
::select(inner
, rx
),
79 sender
: Arc
::new(Mutex
::new(Some(tx
))),
82 let canceller
= this
.canceller();
86 /// Create another `Canceller` for this future.
87 pub fn canceller(&self) -> Canceller
{
88 Canceller(Arc
::clone(&self.sender
))
92 /// Make a future cancellable.
94 /// This is a shortcut for `Cancellable::new`
95 pub fn cancellable
<T
: Future
+ Unpin
>(future
: T
) -> Result
<(Cancellable
<T
>, Canceller
), Error
> {
96 Cancellable
::new(future
)
99 impl<T
: Future
+ Unpin
> Future
for Cancellable
<T
> {
100 type Output
= Option
<<T
as Future
>::Output
>;
101 fn poll(mut self: Pin
<&mut Self>, cx
: &mut Context
) -> Poll
<Self::Output
> {
102 use futures
::future
::Either
;
103 match self.inner
.poll_unpin(cx
) {
104 Poll
::Ready(Either
::Left((output
, _
))) => Poll
::Ready(Some(output
)),
105 Poll
::Ready(Either
::Right(_
)) => Poll
::Ready(None
),
106 Poll
::Pending
=> Poll
::Pending
,