From: Wolfgang Bumiller Date: Tue, 17 Dec 2019 09:52:07 +0000 (+0100) Subject: drop Cancellable future in favor of abortable X-Git-Tag: v0.1.3~300 X-Git-Url: https://git.proxmox.com/?a=commitdiff_plain;h=dc089345636121278876cfb99d1a82d5bacb63c5;p=proxmox-backup.git drop Cancellable future in favor of abortable futures-0.3 has a futures::future::abortable() function which does the exact same, returns an Abortable future with an AbortHandle providing an abort() method. Signed-off-by: Wolfgang Bumiller --- diff --git a/src/client/backup_reader.rs b/src/client/backup_reader.rs index 2c24ed09..9d49ed7f 100644 --- a/src/client/backup_reader.rs +++ b/src/client/backup_reader.rs @@ -5,11 +5,11 @@ use std::sync::Arc; use std::os::unix::fs::OpenOptionsExt; use chrono::{DateTime, Utc}; +use futures::future::AbortHandle; use serde_json::{json, Value}; use proxmox::tools::digest_to_hex; -use crate::tools::futures::Canceller; use crate::backup::*; use super::{HttpClient, H2Client}; @@ -17,21 +17,21 @@ use super::{HttpClient, H2Client}; /// Backup Reader pub struct BackupReader { h2: H2Client, - canceller: Canceller, + abort: AbortHandle, crypt_config: Option>, } impl Drop for BackupReader { fn drop(&mut self) { - self.canceller.cancel(); + self.abort.abort(); } } impl BackupReader { - fn new(h2: H2Client, canceller: Canceller, crypt_config: Option>) -> Arc { - Arc::new(Self { h2, canceller, crypt_config}) + fn new(h2: H2Client, abort: AbortHandle, crypt_config: Option>) -> Arc { + Arc::new(Self { h2, abort, crypt_config}) } /// Create a new instance by upgrading the connection at '/api2/json/reader' @@ -54,9 +54,9 @@ impl BackupReader { }); let req = HttpClient::request_builder(client.server(), "GET", "/api2/json/reader", Some(param)).unwrap(); - let (h2, canceller) = client.start_h2_connection(req, String::from(PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!())).await?; + let (h2, abort) = client.start_h2_connection(req, String::from(PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!())).await?; - Ok(BackupReader::new(h2, canceller, crypt_config)) + Ok(BackupReader::new(h2, abort, crypt_config)) } /// Execute a GET request @@ -119,7 +119,7 @@ impl BackupReader { } pub fn force_close(self) { - self.canceller.cancel(); + self.abort.abort(); } /// Download backup manifest (index.json) diff --git a/src/client/backup_writer.rs b/src/client/backup_writer.rs index 381cb31e..510b0248 100644 --- a/src/client/backup_writer.rs +++ b/src/client/backup_writer.rs @@ -6,6 +6,7 @@ use failure::*; use chrono::{DateTime, Utc}; use futures::*; use futures::stream::Stream; +use futures::future::AbortHandle; use serde_json::{json, Value}; use tokio::io::AsyncReadExt; use tokio::sync::{mpsc, oneshot}; @@ -14,19 +15,18 @@ use proxmox::tools::digest_to_hex; use super::merge_known_chunks::{MergedChunkInfo, MergeKnownChunks}; use crate::backup::*; -use crate::tools::futures::Canceller; use super::{HttpClient, H2Client}; pub struct BackupWriter { h2: H2Client, - canceller: Canceller, + abort: AbortHandle, } impl Drop for BackupWriter { fn drop(&mut self) { - self.canceller.cancel(); + self.abort.abort(); } } @@ -37,8 +37,8 @@ pub struct BackupStats { impl BackupWriter { - fn new(h2: H2Client, canceller: Canceller) -> Arc { - Arc::new(Self { h2, canceller }) + fn new(h2: H2Client, abort: AbortHandle) -> Arc { + Arc::new(Self { h2, abort }) } pub async fn start( @@ -61,9 +61,9 @@ impl BackupWriter { let req = HttpClient::request_builder( client.server(), "GET", "/api2/json/backup", Some(param)).unwrap(); - let (h2, canceller) = client.start_h2_connection(req, String::from(PROXMOX_BACKUP_PROTOCOL_ID_V1!())).await?; + let (h2, abort) = client.start_h2_connection(req, String::from(PROXMOX_BACKUP_PROTOCOL_ID_V1!())).await?; - Ok(BackupWriter::new(h2, canceller)) + Ok(BackupWriter::new(h2, abort)) } pub async fn get( @@ -129,13 +129,13 @@ impl BackupWriter { h2.post("finish", None) .map_ok(move |_| { - self.canceller.cancel(); + self.abort.abort(); }) .await } pub fn cancel(&self) { - self.canceller.cancel(); + self.abort.abort(); } pub async fn upload_blob( diff --git a/src/client/http_client.rs b/src/client/http_client.rs index ff5a4ec1..25f3dda9 100644 --- a/src/client/http_client.rs +++ b/src/client/http_client.rs @@ -20,7 +20,6 @@ use proxmox::tools::{ use super::pipe_to_stream::PipeToSendStream; use crate::tools::async_io::EitherStream; -use crate::tools::futures::{cancellable, Canceller}; use crate::tools::{self, tty, BroadcastFuture, DEFAULT_ENCODE_SET}; #[derive(Clone)] @@ -287,7 +286,7 @@ impl HttpClient { &self, mut req: Request, protocol_name: String, - ) -> Result<(H2Client, Canceller), Error> { + ) -> Result<(H2Client, futures::future::AbortHandle), Error> { let auth = self.login().await?; let client = self.client.clone(); @@ -323,7 +322,7 @@ impl HttpClient { let connection = connection .map_err(|_| panic!("HTTP/2.0 connection failed")); - let (connection, canceller) = cancellable(connection)?; + let (connection, abort) = futures::future::abortable(connection); // A cancellable future returns an Option which is None when cancelled and // Some when it finished instead, since we don't care about the return type we // need to map it away: @@ -334,7 +333,7 @@ impl HttpClient { // Wait until the `SendRequest` handle has available capacity. let c = h2.ready().await?; - Ok((H2Client::new(c), canceller)) + Ok((H2Client::new(c), abort)) } async fn credentials( diff --git a/src/tools.rs b/src/tools.rs index c63d8377..59c92dfa 100644 --- a/src/tools.rs +++ b/src/tools.rs @@ -23,7 +23,6 @@ pub mod async_io; pub mod borrow; pub mod daemon; pub mod fs; -pub mod futures; pub mod runtime; pub mod ticket; pub mod timer; diff --git a/src/tools/futures.rs b/src/tools/futures.rs deleted file mode 100644 index 893fd28d..00000000 --- a/src/tools/futures.rs +++ /dev/null @@ -1,109 +0,0 @@ -//! Provides utilities to deal with futures, such as a `Cancellable` future. - -use std::future::Future; -use std::pin::Pin; -use std::sync::{Arc, Mutex}; -use std::task::{Context, Poll}; - -use failure::Error; -use futures::future::FutureExt; -use tokio::sync::oneshot; - -/// Make a future cancellable. -/// -/// This simply performs a `select()` on the future and something waiting for a signal. If the -/// future finishes successfully, it yields `Some(T::Item)`. If it was cancelled, it'll yield -/// `None`. -/// -/// In order to cancel the future, a `Canceller` is used. -/// -/// ```no_run -/// # use std::future::Future; -/// # use failure::Error; -/// # use futures::future::FutureExt; -/// # use proxmox_backup::tools::futures::Cancellable; -/// # fn doc(future: T) -> Result<(), Error> -/// # where -/// # T: Future + Unpin + Send + Sync + 'static, -/// # { -/// let (future, canceller) = Cancellable::new(future)?; -/// tokio::spawn(future.map(|res| { -/// match res { -/// Some(value) => println!("Future finished with {}", value), -/// None => println!("Future was cancelled"), -/// } -/// })); -/// // Do something -/// canceller.cancel(); -/// # Ok(()) -/// # } -/// ``` -pub struct Cancellable { - /// Our core: we're waiting on a future, on on a lock. The cancel method just unlocks the - /// lock, so that our LockFuture finishes. - inner: futures::future::Select>, - - /// When this future is created, this holds a guard. When a `Canceller` wants to cancel the - /// future, it'll drop this guard, causing our inner future to resolve to `None`. - sender: Arc>>>, -} - -/// Reference to a cancellable future. Multiple instances may exist simultaneously. -/// -/// This allows cancelling another future. If the future already finished, nothing happens. -/// -/// This can be cloned to be used in multiple places. -#[derive(Clone)] -pub struct Canceller(Arc>>>); - -impl Canceller { - /// Cancel the associated future. - /// - /// This does nothing if the future already finished successfully. - pub fn cancel(&self) { - if let Some(sender) = self.0.lock().unwrap().take() { - let _ = sender.send(()); - } - } -} - -impl Cancellable { - /// Make a future cancellable. - /// - /// Returns a future and a `Canceller` which can be cloned and used later to cancel the future. - pub fn new(inner: T) -> Result<(Self, Canceller), Error> { - // we don't even need to store the mutex... - let (tx, rx) = oneshot::channel(); - let this = Self { - inner: futures::future::select(inner, rx), - sender: Arc::new(Mutex::new(Some(tx))), - }; - - let canceller = this.canceller(); - Ok((this, canceller)) - } - - /// Create another `Canceller` for this future. - pub fn canceller(&self) -> Canceller { - Canceller(Arc::clone(&self.sender)) - } -} - -/// Make a future cancellable. -/// -/// This is a shortcut for `Cancellable::new` -pub fn cancellable(future: T) -> Result<(Cancellable, Canceller), Error> { - Cancellable::new(future) -} - -impl Future for Cancellable { - type Output = Option<::Output>; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { - use futures::future::Either; - match self.inner.poll_unpin(cx) { - Poll::Ready(Either::Left((output, _))) => Poll::Ready(Some(output)), - Poll::Ready(Either::Right(_)) => Poll::Ready(None), - Poll::Pending => Poll::Pending, - } - } -}