use std::os::unix::fs::OpenOptionsExt;
use chrono::{DateTime, Utc};
+use futures::future::AbortHandle;
use serde_json::{json, Value};
use proxmox::tools::digest_to_hex;
-use crate::tools::futures::Canceller;
use crate::backup::*;
use super::{HttpClient, H2Client};
/// Backup Reader
pub struct BackupReader {
h2: H2Client,
- canceller: Canceller,
+ abort: AbortHandle,
crypt_config: Option<Arc<CryptConfig>>,
}
impl Drop for BackupReader {
fn drop(&mut self) {
- self.canceller.cancel();
+ self.abort.abort();
}
}
impl BackupReader {
- fn new(h2: H2Client, canceller: Canceller, crypt_config: Option<Arc<CryptConfig>>) -> Arc<Self> {
- Arc::new(Self { h2, canceller, crypt_config})
+ fn new(h2: H2Client, abort: AbortHandle, crypt_config: Option<Arc<CryptConfig>>) -> Arc<Self> {
+ Arc::new(Self { h2, abort, crypt_config})
}
/// Create a new instance by upgrading the connection at '/api2/json/reader'
});
let req = HttpClient::request_builder(client.server(), "GET", "/api2/json/reader", Some(param)).unwrap();
- let (h2, canceller) = client.start_h2_connection(req, String::from(PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!())).await?;
+ let (h2, abort) = client.start_h2_connection(req, String::from(PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!())).await?;
- Ok(BackupReader::new(h2, canceller, crypt_config))
+ Ok(BackupReader::new(h2, abort, crypt_config))
}
/// Execute a GET request
}
pub fn force_close(self) {
- self.canceller.cancel();
+ self.abort.abort();
}
/// Download backup manifest (index.json)
use chrono::{DateTime, Utc};
use futures::*;
use futures::stream::Stream;
+use futures::future::AbortHandle;
use serde_json::{json, Value};
use tokio::io::AsyncReadExt;
use tokio::sync::{mpsc, oneshot};
use super::merge_known_chunks::{MergedChunkInfo, MergeKnownChunks};
use crate::backup::*;
-use crate::tools::futures::Canceller;
use super::{HttpClient, H2Client};
pub struct BackupWriter {
h2: H2Client,
- canceller: Canceller,
+ abort: AbortHandle,
}
impl Drop for BackupWriter {
fn drop(&mut self) {
- self.canceller.cancel();
+ self.abort.abort();
}
}
impl BackupWriter {
- fn new(h2: H2Client, canceller: Canceller) -> Arc<Self> {
- Arc::new(Self { h2, canceller })
+ fn new(h2: H2Client, abort: AbortHandle) -> Arc<Self> {
+ Arc::new(Self { h2, abort })
}
pub async fn start(
let req = HttpClient::request_builder(
client.server(), "GET", "/api2/json/backup", Some(param)).unwrap();
- let (h2, canceller) = client.start_h2_connection(req, String::from(PROXMOX_BACKUP_PROTOCOL_ID_V1!())).await?;
+ let (h2, abort) = client.start_h2_connection(req, String::from(PROXMOX_BACKUP_PROTOCOL_ID_V1!())).await?;
- Ok(BackupWriter::new(h2, canceller))
+ Ok(BackupWriter::new(h2, abort))
}
pub async fn get(
h2.post("finish", None)
.map_ok(move |_| {
- self.canceller.cancel();
+ self.abort.abort();
})
.await
}
pub fn cancel(&self) {
- self.canceller.cancel();
+ self.abort.abort();
}
pub async fn upload_blob<R: std::io::Read>(
use super::pipe_to_stream::PipeToSendStream;
use crate::tools::async_io::EitherStream;
-use crate::tools::futures::{cancellable, Canceller};
use crate::tools::{self, tty, BroadcastFuture, DEFAULT_ENCODE_SET};
#[derive(Clone)]
&self,
mut req: Request<Body>,
protocol_name: String,
- ) -> Result<(H2Client, Canceller), Error> {
+ ) -> Result<(H2Client, futures::future::AbortHandle), Error> {
let auth = self.login().await?;
let client = self.client.clone();
let connection = connection
.map_err(|_| panic!("HTTP/2.0 connection failed"));
- let (connection, canceller) = cancellable(connection)?;
+ let (connection, abort) = futures::future::abortable(connection);
// A cancellable future returns an Option which is None when cancelled and
// Some when it finished instead, since we don't care about the return type we
// need to map it away:
// Wait until the `SendRequest` handle has available capacity.
let c = h2.ready().await?;
- Ok((H2Client::new(c), canceller))
+ Ok((H2Client::new(c), abort))
}
async fn credentials(
pub mod borrow;
pub mod daemon;
pub mod fs;
-pub mod futures;
pub mod runtime;
pub mod ticket;
pub mod timer;
+++ /dev/null
-//! Provides utilities to deal with futures, such as a `Cancellable` future.
-
-use std::future::Future;
-use std::pin::Pin;
-use std::sync::{Arc, Mutex};
-use std::task::{Context, Poll};
-
-use failure::Error;
-use futures::future::FutureExt;
-use tokio::sync::oneshot;
-
-/// Make a future cancellable.
-///
-/// This simply performs a `select()` on the future and something waiting for a signal. If the
-/// future finishes successfully, it yields `Some(T::Item)`. If it was cancelled, it'll yield
-/// `None`.
-///
-/// In order to cancel the future, a `Canceller` is used.
-///
-/// ```no_run
-/// # use std::future::Future;
-/// # use failure::Error;
-/// # use futures::future::FutureExt;
-/// # use proxmox_backup::tools::futures::Cancellable;
-/// # fn doc<T>(future: T) -> Result<(), Error>
-/// # where
-/// # T: Future<Output = i32> + Unpin + Send + Sync + 'static,
-/// # {
-/// let (future, canceller) = Cancellable::new(future)?;
-/// tokio::spawn(future.map(|res| {
-/// match res {
-/// Some(value) => println!("Future finished with {}", value),
-/// None => println!("Future was cancelled"),
-/// }
-/// }));
-/// // Do something
-/// canceller.cancel();
-/// # Ok(())
-/// # }
-/// ```
-pub struct Cancellable<T: Future + Unpin> {
- /// Our core: we're waiting on a future, on on a lock. The cancel method just unlocks the
- /// lock, so that our LockFuture finishes.
- inner: futures::future::Select<T, oneshot::Receiver<()>>,
-
- /// When this future is created, this holds a guard. When a `Canceller` wants to cancel the
- /// future, it'll drop this guard, causing our inner future to resolve to `None`.
- sender: Arc<Mutex<Option<oneshot::Sender<()>>>>,
-}
-
-/// Reference to a cancellable future. Multiple instances may exist simultaneously.
-///
-/// This allows cancelling another future. If the future already finished, nothing happens.
-///
-/// This can be cloned to be used in multiple places.
-#[derive(Clone)]
-pub struct Canceller(Arc<Mutex<Option<oneshot::Sender<()>>>>);
-
-impl Canceller {
- /// Cancel the associated future.
- ///
- /// This does nothing if the future already finished successfully.
- pub fn cancel(&self) {
- if let Some(sender) = self.0.lock().unwrap().take() {
- let _ = sender.send(());
- }
- }
-}
-
-impl<T: Future + Unpin> Cancellable<T> {
- /// Make a future cancellable.
- ///
- /// Returns a future and a `Canceller` which can be cloned and used later to cancel the future.
- pub fn new(inner: T) -> Result<(Self, Canceller), Error> {
- // we don't even need to store the mutex...
- let (tx, rx) = oneshot::channel();
- let this = Self {
- inner: futures::future::select(inner, rx),
- sender: Arc::new(Mutex::new(Some(tx))),
- };
-
- let canceller = this.canceller();
- Ok((this, canceller))
- }
-
- /// Create another `Canceller` for this future.
- pub fn canceller(&self) -> Canceller {
- Canceller(Arc::clone(&self.sender))
- }
-}
-
-/// Make a future cancellable.
-///
-/// This is a shortcut for `Cancellable::new`
-pub fn cancellable<T: Future + Unpin>(future: T) -> Result<(Cancellable<T>, Canceller), Error> {
- Cancellable::new(future)
-}
-
-impl<T: Future + Unpin> Future for Cancellable<T> {
- type Output = Option<<T as Future>::Output>;
- fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
- use futures::future::Either;
- match self.inner.poll_unpin(cx) {
- Poll::Ready(Either::Left((output, _))) => Poll::Ready(Some(output)),
- Poll::Ready(Either::Right(_)) => Poll::Ready(None),
- Poll::Pending => Poll::Pending,
- }
- }
-}