]> git.proxmox.com Git - proxmox-backup.git/commitdiff
drop Cancellable future in favor of abortable
authorWolfgang Bumiller <w.bumiller@proxmox.com>
Tue, 17 Dec 2019 09:52:07 +0000 (10:52 +0100)
committerWolfgang Bumiller <w.bumiller@proxmox.com>
Tue, 17 Dec 2019 09:52:07 +0000 (10:52 +0100)
futures-0.3 has a futures::future::abortable() function
which does the exact same, returns an Abortable future with
an AbortHandle providing an abort() method.

Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
src/client/backup_reader.rs
src/client/backup_writer.rs
src/client/http_client.rs
src/tools.rs
src/tools/futures.rs [deleted file]

index 2c24ed09911083622324a6eaf6747a77fc0bdb69..9d49ed7fe95e658dfeb14f34eee52fd0ed5d48ea 100644 (file)
@@ -5,11 +5,11 @@ use std::sync::Arc;
 use std::os::unix::fs::OpenOptionsExt;
 
 use chrono::{DateTime, Utc};
+use futures::future::AbortHandle;
 use serde_json::{json, Value};
 
 use proxmox::tools::digest_to_hex;
 
-use crate::tools::futures::Canceller;
 use crate::backup::*;
 
 use super::{HttpClient, H2Client};
@@ -17,21 +17,21 @@ use super::{HttpClient, H2Client};
 /// Backup Reader
 pub struct BackupReader {
     h2: H2Client,
-    canceller: Canceller,
+    abort: AbortHandle,
     crypt_config: Option<Arc<CryptConfig>>,
 }
 
 impl Drop for BackupReader {
 
     fn drop(&mut self) {
-        self.canceller.cancel();
+        self.abort.abort();
     }
 }
 
 impl BackupReader {
 
-    fn new(h2: H2Client, canceller: Canceller, crypt_config: Option<Arc<CryptConfig>>) -> Arc<Self> {
-        Arc::new(Self { h2, canceller, crypt_config})
+    fn new(h2: H2Client, abort: AbortHandle, crypt_config: Option<Arc<CryptConfig>>) -> Arc<Self> {
+        Arc::new(Self { h2, abort, crypt_config})
     }
 
     /// Create a new instance by upgrading the connection at '/api2/json/reader'
@@ -54,9 +54,9 @@ impl BackupReader {
         });
         let req = HttpClient::request_builder(client.server(), "GET", "/api2/json/reader", Some(param)).unwrap();
 
-        let (h2, canceller) = client.start_h2_connection(req, String::from(PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!())).await?;
+        let (h2, abort) = client.start_h2_connection(req, String::from(PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!())).await?;
 
-        Ok(BackupReader::new(h2, canceller, crypt_config))
+        Ok(BackupReader::new(h2, abort, crypt_config))
     }
 
     /// Execute a GET request
@@ -119,7 +119,7 @@ impl BackupReader {
     }
 
     pub fn force_close(self) {
-        self.canceller.cancel();
+        self.abort.abort();
     }
 
     /// Download backup manifest (index.json)
index 381cb31e426c239f1bfed4813c4022a5941d1577..510b02483f5623f40c8e28d17199b70ea254be59 100644 (file)
@@ -6,6 +6,7 @@ use failure::*;
 use chrono::{DateTime, Utc};
 use futures::*;
 use futures::stream::Stream;
+use futures::future::AbortHandle;
 use serde_json::{json, Value};
 use tokio::io::AsyncReadExt;
 use tokio::sync::{mpsc, oneshot};
@@ -14,19 +15,18 @@ use proxmox::tools::digest_to_hex;
 
 use super::merge_known_chunks::{MergedChunkInfo, MergeKnownChunks};
 use crate::backup::*;
-use crate::tools::futures::Canceller;
 
 use super::{HttpClient, H2Client};
 
 pub struct BackupWriter {
     h2: H2Client,
-    canceller: Canceller,
+    abort: AbortHandle,
 }
 
 impl Drop for BackupWriter {
 
     fn drop(&mut self) {
-        self.canceller.cancel();
+        self.abort.abort();
     }
 }
 
@@ -37,8 +37,8 @@ pub struct BackupStats {
 
 impl BackupWriter {
 
-    fn new(h2: H2Client, canceller: Canceller) -> Arc<Self> {
-        Arc::new(Self { h2, canceller })
+    fn new(h2: H2Client, abort: AbortHandle) -> Arc<Self> {
+        Arc::new(Self { h2, abort })
     }
 
     pub async fn start(
@@ -61,9 +61,9 @@ impl BackupWriter {
         let req = HttpClient::request_builder(
             client.server(), "GET", "/api2/json/backup", Some(param)).unwrap();
 
-        let (h2, canceller) = client.start_h2_connection(req, String::from(PROXMOX_BACKUP_PROTOCOL_ID_V1!())).await?;
+        let (h2, abort) = client.start_h2_connection(req, String::from(PROXMOX_BACKUP_PROTOCOL_ID_V1!())).await?;
 
-        Ok(BackupWriter::new(h2, canceller))
+        Ok(BackupWriter::new(h2, abort))
     }
 
     pub async fn get(
@@ -129,13 +129,13 @@ impl BackupWriter {
 
         h2.post("finish", None)
             .map_ok(move |_| {
-                self.canceller.cancel();
+                self.abort.abort();
             })
             .await
     }
 
     pub fn cancel(&self) {
-        self.canceller.cancel();
+        self.abort.abort();
     }
 
     pub async fn upload_blob<R: std::io::Read>(
index ff5a4ec1f80d6a6551cf86aec64fbce73376fd1f..25f3dda94e439bae4150774fc1a0928753943d03 100644 (file)
@@ -20,7 +20,6 @@ use proxmox::tools::{
 
 use super::pipe_to_stream::PipeToSendStream;
 use crate::tools::async_io::EitherStream;
-use crate::tools::futures::{cancellable, Canceller};
 use crate::tools::{self, tty, BroadcastFuture, DEFAULT_ENCODE_SET};
 
 #[derive(Clone)]
@@ -287,7 +286,7 @@ impl HttpClient {
         &self,
         mut req: Request<Body>,
         protocol_name: String,
-    ) -> Result<(H2Client, Canceller), Error> {
+    ) -> Result<(H2Client, futures::future::AbortHandle), Error> {
 
         let auth = self.login().await?;
         let client = self.client.clone();
@@ -323,7 +322,7 @@ impl HttpClient {
         let connection = connection
             .map_err(|_| panic!("HTTP/2.0 connection failed"));
 
-        let (connection, canceller) = cancellable(connection)?;
+        let (connection, abort) = futures::future::abortable(connection);
         // A cancellable future returns an Option which is None when cancelled and
         // Some when it finished instead, since we don't care about the return type we
         // need to map it away:
@@ -334,7 +333,7 @@ impl HttpClient {
 
         // Wait until the `SendRequest` handle has available capacity.
         let c = h2.ready().await?;
-        Ok((H2Client::new(c), canceller))
+        Ok((H2Client::new(c), abort))
     }
 
     async fn credentials(
index c63d837715f47703755904ca8ca9a288b1e2da14..59c92dfaf0493370e76a6f4a0ccbfa48d36d6567 100644 (file)
@@ -23,7 +23,6 @@ pub mod async_io;
 pub mod borrow;
 pub mod daemon;
 pub mod fs;
-pub mod futures;
 pub mod runtime;
 pub mod ticket;
 pub mod timer;
diff --git a/src/tools/futures.rs b/src/tools/futures.rs
deleted file mode 100644 (file)
index 893fd28..0000000
+++ /dev/null
@@ -1,109 +0,0 @@
-//! Provides utilities to deal with futures, such as a `Cancellable` future.
-
-use std::future::Future;
-use std::pin::Pin;
-use std::sync::{Arc, Mutex};
-use std::task::{Context, Poll};
-
-use failure::Error;
-use futures::future::FutureExt;
-use tokio::sync::oneshot;
-
-/// Make a future cancellable.
-///
-/// This simply performs a `select()` on the future and something waiting for a signal. If the
-/// future finishes successfully, it yields `Some(T::Item)`. If it was cancelled, it'll yield
-/// `None`.
-///
-/// In order to cancel the future, a `Canceller` is used.
-///
-/// ```no_run
-/// # use std::future::Future;
-/// # use failure::Error;
-/// # use futures::future::FutureExt;
-/// # use proxmox_backup::tools::futures::Cancellable;
-/// # fn doc<T>(future: T) -> Result<(), Error>
-/// # where
-/// #     T: Future<Output = i32> + Unpin + Send + Sync + 'static,
-/// # {
-/// let (future, canceller) = Cancellable::new(future)?;
-/// tokio::spawn(future.map(|res| {
-///     match res {
-///         Some(value) => println!("Future finished with {}", value),
-///         None => println!("Future was cancelled"),
-///     }
-/// }));
-/// // Do something
-/// canceller.cancel();
-/// # Ok(())
-/// # }
-/// ```
-pub struct Cancellable<T: Future + Unpin> {
-    /// Our core: we're waiting on a future, on on a lock. The cancel method just unlocks the
-    /// lock, so that our LockFuture finishes.
-    inner: futures::future::Select<T, oneshot::Receiver<()>>,
-
-    /// When this future is created, this holds a guard. When a `Canceller` wants to cancel the
-    /// future, it'll drop this guard, causing our inner future to resolve to `None`.
-    sender: Arc<Mutex<Option<oneshot::Sender<()>>>>,
-}
-
-/// Reference to a cancellable future. Multiple instances may exist simultaneously.
-///
-/// This allows cancelling another future. If the future already finished, nothing happens.
-///
-/// This can be cloned to be used in multiple places.
-#[derive(Clone)]
-pub struct Canceller(Arc<Mutex<Option<oneshot::Sender<()>>>>);
-
-impl Canceller {
-    /// Cancel the associated future.
-    ///
-    /// This does nothing if the future already finished successfully.
-    pub fn cancel(&self) {
-        if let Some(sender) = self.0.lock().unwrap().take() {
-            let _ = sender.send(());
-        }
-    }
-}
-
-impl<T: Future + Unpin> Cancellable<T> {
-    /// Make a future cancellable.
-    ///
-    /// Returns a future and a `Canceller` which can be cloned and used later to cancel the future.
-    pub fn new(inner: T) -> Result<(Self, Canceller), Error> {
-        // we don't even need to store the mutex...
-        let (tx, rx) = oneshot::channel();
-        let this = Self {
-            inner: futures::future::select(inner, rx),
-            sender: Arc::new(Mutex::new(Some(tx))),
-        };
-
-        let canceller = this.canceller();
-        Ok((this, canceller))
-    }
-
-    /// Create another `Canceller` for this future.
-    pub fn canceller(&self) -> Canceller {
-        Canceller(Arc::clone(&self.sender))
-    }
-}
-
-/// Make a future cancellable.
-///
-/// This is a shortcut for `Cancellable::new`
-pub fn cancellable<T: Future + Unpin>(future: T) -> Result<(Cancellable<T>, Canceller), Error> {
-    Cancellable::new(future)
-}
-
-impl<T: Future + Unpin> Future for Cancellable<T> {
-    type Output = Option<<T as Future>::Output>;
-    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
-        use futures::future::Either;
-        match self.inner.poll_unpin(cx) {
-            Poll::Ready(Either::Left((output, _))) => Poll::Ready(Some(output)),
-            Poll::Ready(Either::Right(_)) => Poll::Ready(None),
-            Poll::Pending => Poll::Pending,
-        }
-    }
-}