]> git.proxmox.com Git - proxmox-backup.git/commitdiff
move more tools for the client into subcrates
authorWolfgang Bumiller <w.bumiller@proxmox.com>
Thu, 15 Jul 2021 10:15:50 +0000 (12:15 +0200)
committerWolfgang Bumiller <w.bumiller@proxmox.com>
Mon, 19 Jul 2021 08:07:12 +0000 (10:07 +0200)
Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
33 files changed:
Cargo.toml
pbs-tools/Cargo.toml
pbs-tools/src/auth.rs [new file with mode: 0644]
pbs-tools/src/broadcast_future.rs [new file with mode: 0644]
pbs-tools/src/cert.rs [new file with mode: 0644]
pbs-tools/src/lib.rs
pbs-tools/src/percent_encoding.rs [new file with mode: 0644]
pbs-tools/src/sync/mod.rs [new file with mode: 0644]
pbs-tools/src/sync/std_channel_writer.rs [new file with mode: 0644]
pbs-tools/src/tokio/mod.rs [new file with mode: 0644]
pbs-tools/src/tokio/tokio_writer_adapter.rs [new file with mode: 0644]
src/api2/access/mod.rs
src/api2/access/openid.rs
src/api2/node/certificates.rs
src/api2/node/mod.rs
src/api2/node/status.rs
src/auth_helpers.rs
src/bin/proxmox-backup-api.rs
src/bin/proxmox-backup-client.rs
src/bin/proxmox-backup-manager.rs
src/bin/proxmox_backup_client/task.rs
src/bin/proxmox_backup_manager/cert.rs
src/client/backup_writer.rs
src/client/http_client.rs
src/client/mod.rs
src/client/pxar_backup_stream.rs
src/client/task_log.rs
src/client/vsock_client.rs
src/tools/broadcast_future.rs [deleted file]
src/tools/cert.rs [deleted file]
src/tools/mod.rs
src/tools/std_channel_writer.rs [deleted file]
src/tools/tokio_writer_adapter.rs [deleted file]

index 71deacd4a39863b96d5509070f51f0bac746ac5c..9c41605ac1f3b262be802bb55cbd5cc1857d1c7c 100644 (file)
@@ -44,7 +44,6 @@ endian_trait = { version = "0.6", features = ["arrays"] }
 env_logger = "0.7"
 flate2 = "1.0"
 anyhow = "1.0"
-foreign-types = "0.3"
 thiserror = "1.0"
 futures = "0.3"
 h2 = { version = "0.3", features = [ "stream" ] }
index a3aa81c1fef95013e9e79f6a0d6bc7565f3c9cc1..ab80666c5850cf3184081a2aec4db2c525983188 100644 (file)
@@ -9,6 +9,9 @@ description = "common tools used throughout pbs"
 [dependencies]
 anyhow = "1.0"
 base64 = "0.12"
+foreign-types = "0.3"
+futures = "0.3"
+lazy_static = "1.4"
 libc = "0.2"
 nix = "0.19.1"
 nom = "5.1"
@@ -17,6 +20,10 @@ percent-encoding = "2.1"
 regex = "1.2"
 serde = "1.0"
 serde_json = "1.0"
+# rt-multi-thread is required for block_in_place
+tokio = { version = "1.6", features = [ "rt", "rt-multi-thread", "sync" ] }
 url = "2.1"
 
 proxmox = { version = "0.11.5", default-features = false, features = [] }
+
+pbs-buildcfg = { path = "../pbs-buildcfg" }
diff --git a/pbs-tools/src/auth.rs b/pbs-tools/src/auth.rs
new file mode 100644 (file)
index 0000000..6e605dd
--- /dev/null
@@ -0,0 +1,26 @@
+//! Helpers for authentication used by both client and server.
+
+use anyhow::Error;
+use lazy_static::lazy_static;
+use openssl::pkey::{PKey, Private};
+use openssl::rsa::Rsa;
+
+use proxmox::tools::fs::file_get_contents;
+
+use pbs_buildcfg::configdir;
+
+fn load_private_auth_key() -> Result<PKey<Private>, Error> {
+    let pem = file_get_contents(configdir!("/authkey.key"))?;
+    let rsa = Rsa::private_key_from_pem(&pem)?;
+    let key = PKey::from_rsa(rsa)?;
+
+    Ok(key)
+}
+
+pub fn private_auth_key() -> &'static PKey<Private> {
+    lazy_static! {
+        static ref KEY: PKey<Private> = load_private_auth_key().unwrap();
+    }
+
+    &KEY
+}
diff --git a/pbs-tools/src/broadcast_future.rs b/pbs-tools/src/broadcast_future.rs
new file mode 100644 (file)
index 0000000..7bfd83b
--- /dev/null
@@ -0,0 +1,180 @@
+use std::future::Future;
+use std::pin::Pin;
+use std::sync::{Arc, Mutex};
+
+use anyhow::{format_err, Error};
+use futures::future::{FutureExt, TryFutureExt};
+use tokio::sync::oneshot;
+
+/// Broadcast results to registered listeners using asnyc oneshot channels
+#[derive(Default)]
+pub struct BroadcastData<T> {
+    result: Option<Result<T, String>>,
+    listeners: Vec<oneshot::Sender<Result<T, Error>>>,
+}
+
+impl <T: Clone> BroadcastData<T> {
+
+    pub fn new() -> Self {
+        Self {
+            result: None,
+            listeners: vec![],
+        }
+    }
+
+    pub fn notify_listeners(&mut self, result: Result<T, String>) {
+
+        self.result = Some(result.clone());
+
+        loop {
+            match self.listeners.pop() {
+                None => { break; },
+                Some(ch) => {
+                    match &result {
+                        Ok(result) => { let _ = ch.send(Ok(result.clone())); },
+                        Err(err) => { let _ = ch.send(Err(format_err!("{}", err))); },
+                    }
+                },
+            }
+        }
+    }
+
+    pub fn listen(&mut self) -> impl Future<Output = Result<T, Error>> {
+        use futures::future::{ok, Either};
+
+        match &self.result {
+            None => {},
+            Some(Ok(result)) => return Either::Left(ok(result.clone())),
+            Some(Err(err)) => return Either::Left(futures::future::err(format_err!("{}", err))),
+        }
+
+        let (tx, rx) = oneshot::channel::<Result<T, Error>>();
+
+        self.listeners.push(tx);
+
+        Either::Right(rx
+            .map(|res| match res {
+                Ok(Ok(t)) => Ok(t),
+                Ok(Err(e)) => Err(e),
+                Err(e) => Err(Error::from(e)),
+            })
+        )
+    }
+}
+
+type SourceFuture<T> = Pin<Box<dyn Future<Output = Result<T, Error>> + Send>>;
+
+struct BroadCastFutureBinding<T> {
+    broadcast: BroadcastData<T>,
+    future: Option<SourceFuture<T>>,
+}
+
+/// Broadcast future results to registered listeners
+pub struct BroadcastFuture<T> {
+    inner: Arc<Mutex<BroadCastFutureBinding<T>>>,
+}
+
+impl<T: Clone + Send + 'static> BroadcastFuture<T> {
+    /// Create instance for specified source future.
+    ///
+    /// The result of the future is sent to all registered listeners.
+    pub fn new(source: Box<dyn Future<Output = Result<T, Error>> + Send>) -> Self {
+        let inner = BroadCastFutureBinding {
+            broadcast: BroadcastData::new(),
+            future: Some(Pin::from(source)),
+        };
+        Self { inner: Arc::new(Mutex::new(inner)) }
+    }
+
+    /// Creates a new instance with a oneshot channel as trigger
+    pub fn new_oneshot() -> (Self, oneshot::Sender<Result<T, Error>>) {
+
+        let (tx, rx) = oneshot::channel::<Result<T, Error>>();
+        let rx = rx
+            .map_err(Error::from)
+            .and_then(futures::future::ready);
+
+        (Self::new(Box::new(rx)), tx)
+    }
+
+    fn notify_listeners(
+        inner: Arc<Mutex<BroadCastFutureBinding<T>>>,
+        result: Result<T, String>,
+    ) {
+        let mut data = inner.lock().unwrap();
+        data.broadcast.notify_listeners(result);
+    }
+
+    fn spawn(inner: Arc<Mutex<BroadCastFutureBinding<T>>>) -> impl Future<Output = Result<T, Error>> {
+        let mut data = inner.lock().unwrap();
+
+        if let Some(source) = data.future.take() {
+
+            let inner1 = inner.clone();
+
+            let task = source.map(move |value| {
+                match value {
+                    Ok(value) => Self::notify_listeners(inner1, Ok(value)),
+                    Err(err) => Self::notify_listeners(inner1, Err(err.to_string())),
+                }
+            });
+            tokio::spawn(task);
+        }
+
+        data.broadcast.listen()
+    }
+
+    /// Register a listener
+    pub fn listen(&self) -> impl Future<Output = Result<T, Error>> {
+        let inner2 = self.inner.clone();
+        async move { Self::spawn(inner2).await }
+    }
+}
+
+#[test]
+fn test_broadcast_future() {
+    use std::sync::atomic::{AtomicUsize, Ordering};
+
+    static CHECKSUM: AtomicUsize  = AtomicUsize::new(0);
+
+    let (sender, trigger) = BroadcastFuture::new_oneshot();
+
+    let receiver1 = sender.listen()
+        .map_ok(|res| {
+            CHECKSUM.fetch_add(res, Ordering::SeqCst);
+        })
+        .map_err(|err| { panic!("got error {}", err); })
+        .map(|_| ());
+
+    let receiver2 = sender.listen()
+        .map_ok(|res| {
+            CHECKSUM.fetch_add(res*2, Ordering::SeqCst);
+        })
+        .map_err(|err| { panic!("got error {}", err); })
+        .map(|_| ());
+
+    let rt = tokio::runtime::Runtime::new().unwrap();
+    rt.block_on(async move {
+        let r1 = tokio::spawn(receiver1);
+        let r2 = tokio::spawn(receiver2);
+
+        trigger.send(Ok(1)).unwrap();
+        let _ = r1.await;
+        let _ = r2.await;
+    });
+
+    let result = CHECKSUM.load(Ordering::SeqCst);
+
+    assert_eq!(result, 3);
+
+    // the result stays available until the BroadcastFuture is dropped
+    rt.block_on(sender.listen()
+        .map_ok(|res| {
+            CHECKSUM.fetch_add(res*4, Ordering::SeqCst);
+        })
+        .map_err(|err| { panic!("got error {}", err); })
+        .map(|_| ()));
+
+    let result = CHECKSUM.load(Ordering::SeqCst);
+    assert_eq!(result, 7);
+}
diff --git a/pbs-tools/src/cert.rs b/pbs-tools/src/cert.rs
new file mode 100644 (file)
index 0000000..cef04fe
--- /dev/null
@@ -0,0 +1,104 @@
+use std::path::PathBuf;
+use std::mem::MaybeUninit;
+
+use anyhow::{bail, format_err, Error};
+use foreign_types::ForeignTypeRef;
+use openssl::x509::{X509, GeneralName};
+use openssl::stack::Stack;
+use openssl::pkey::{Public, PKey};
+
+use pbs_buildcfg::configdir;
+
+// C type:
+#[allow(non_camel_case_types)]
+type ASN1_TIME = <openssl::asn1::Asn1TimeRef as ForeignTypeRef>::CType;
+
+extern "C" {
+    fn ASN1_TIME_to_tm(s: *const ASN1_TIME, tm: *mut libc::tm) -> libc::c_int;
+}
+
+fn asn1_time_to_unix(time: &openssl::asn1::Asn1TimeRef) -> Result<i64, Error> {
+    let mut c_tm = MaybeUninit::<libc::tm>::uninit();
+    let rc = unsafe { ASN1_TIME_to_tm(time.as_ptr(), c_tm.as_mut_ptr()) };
+    if rc != 1 {
+        bail!("failed to parse ASN1 time");
+    }
+    let mut c_tm = unsafe { c_tm.assume_init() };
+    proxmox::tools::time::timegm(&mut c_tm)
+}
+
+pub struct CertInfo {
+    x509: X509,
+}
+
+fn x509name_to_string(name: &openssl::x509::X509NameRef) -> Result<String, Error> {
+    let mut parts = Vec::new();
+    for entry in name.entries() {
+        parts.push(format!("{} = {}", entry.object().nid().short_name()?, entry.data().as_utf8()?));
+    }
+    Ok(parts.join(", "))
+}
+
+impl CertInfo {
+    pub fn new() -> Result<Self, Error> {
+        Self::from_path(PathBuf::from(configdir!("/proxy.pem")))
+    }
+
+    pub fn from_path(path: PathBuf) -> Result<Self, Error> {
+        Self::from_pem(&proxmox::tools::fs::file_get_contents(&path)?)
+            .map_err(|err| format_err!("failed to load certificate from {:?} - {}", path, err))
+    }
+
+    pub fn from_pem(cert_pem: &[u8]) -> Result<Self, Error> {
+        let x509 = openssl::x509::X509::from_pem(&cert_pem)?;
+        Ok(Self{
+            x509
+        })
+    }
+
+    pub fn subject_alt_names(&self) -> Option<Stack<GeneralName>> {
+        self.x509.subject_alt_names()
+    }
+
+    pub fn subject_name(&self) -> Result<String, Error> {
+        Ok(x509name_to_string(self.x509.subject_name())?)
+    }
+
+    pub fn issuer_name(&self) -> Result<String, Error> {
+        Ok(x509name_to_string(self.x509.issuer_name())?)
+    }
+
+    pub fn fingerprint(&self) -> Result<String, Error> {
+        let fp = self.x509.digest(openssl::hash::MessageDigest::sha256())?;
+        let fp_string = proxmox::tools::digest_to_hex(&fp);
+        let fp_string = fp_string.as_bytes().chunks(2).map(|v| std::str::from_utf8(v).unwrap())
+            .collect::<Vec<&str>>().join(":");
+        Ok(fp_string)
+    }
+
+    pub fn public_key(&self) -> Result<PKey<Public>, Error> {
+        let pubkey = self.x509.public_key()?;
+        Ok(pubkey)
+    }
+
+    pub fn not_before(&self) -> &openssl::asn1::Asn1TimeRef {
+        self.x509.not_before()
+    }
+
+    pub fn not_after(&self) -> &openssl::asn1::Asn1TimeRef {
+        self.x509.not_after()
+    }
+
+    pub fn not_before_unix(&self) -> Result<i64, Error> {
+        asn1_time_to_unix(&self.not_before())
+    }
+
+    pub fn not_after_unix(&self) -> Result<i64, Error> {
+        asn1_time_to_unix(&self.not_after())
+    }
+
+    /// Check if the certificate is expired at or after a specific unix epoch.
+    pub fn is_expired_after_epoch(&self, epoch: i64) -> Result<bool, Error> {
+        Ok(self.not_after_unix()? < epoch)
+    }
+}
index 72b0e9fdce22ae873c12d9cf1e3629bd207447be..c9d95dd9dcd1fdc158699a619780778b525654c3 100644 (file)
@@ -1,12 +1,18 @@
+pub mod auth;
 pub mod borrow;
+pub mod broadcast_future;
+pub mod cert;
 pub mod format;
 pub mod fs;
 pub mod json;
 pub mod nom;
+pub mod percent_encoding;
 pub mod process_locker;
 pub mod sha;
 pub mod str;
+pub mod sync;
 pub mod ticket;
+pub mod tokio;
 
 mod command;
 pub use command::{command_output, command_output_as_string, run_command};
diff --git a/pbs-tools/src/percent_encoding.rs b/pbs-tools/src/percent_encoding.rs
new file mode 100644 (file)
index 0000000..afe011e
--- /dev/null
@@ -0,0 +1,22 @@
+use percent_encoding::{utf8_percent_encode, AsciiSet};
+
+/// This used to be: `SIMPLE_ENCODE_SET` plus space, `"`, `#`, `<`, `>`, backtick, `?`, `{`, `}`
+pub const DEFAULT_ENCODE_SET: &AsciiSet = &percent_encoding::CONTROLS // 0..1f and 7e
+    // The SIMPLE_ENCODE_SET adds space and anything >= 0x7e (7e itself is already included above)
+    .add(0x20)
+    .add(0x7f)
+    // the DEFAULT_ENCODE_SET added:
+    .add(b' ')
+    .add(b'"')
+    .add(b'#')
+    .add(b'<')
+    .add(b'>')
+    .add(b'`')
+    .add(b'?')
+    .add(b'{')
+    .add(b'}');
+
+/// percent encode a url component
+pub fn percent_encode_component(comp: &str) -> String {
+    utf8_percent_encode(comp, percent_encoding::NON_ALPHANUMERIC).to_string()
+}
diff --git a/pbs-tools/src/sync/mod.rs b/pbs-tools/src/sync/mod.rs
new file mode 100644 (file)
index 0000000..fdd5eda
--- /dev/null
@@ -0,0 +1,2 @@
+mod std_channel_writer;
+pub use std_channel_writer::StdChannelWriter;
diff --git a/pbs-tools/src/sync/std_channel_writer.rs b/pbs-tools/src/sync/std_channel_writer.rs
new file mode 100644 (file)
index 0000000..d2f6444
--- /dev/null
@@ -0,0 +1,28 @@
+use std::io::Write;
+use std::sync::mpsc::SyncSender;
+
+use anyhow::{Error};
+
+/// Wrapper around SyncSender, which implements Write
+///
+/// Each write in translated into a send(Vec<u8>).
+pub struct StdChannelWriter(SyncSender<Result<Vec<u8>, Error>>);
+
+impl StdChannelWriter {
+    pub fn new(sender: SyncSender<Result<Vec<u8>, Error>>) -> Self {
+        Self(sender)
+    }
+}
+
+impl Write for StdChannelWriter {
+    fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
+        self.0
+            .send(Ok(buf.to_vec()))
+            .map_err(proxmox::sys::error::io_err_other)
+            .and(Ok(buf.len()))
+    }
+
+    fn flush(&mut self) -> Result<(), std::io::Error> {
+        Ok(())
+    }
+}
diff --git a/pbs-tools/src/tokio/mod.rs b/pbs-tools/src/tokio/mod.rs
new file mode 100644 (file)
index 0000000..43fc107
--- /dev/null
@@ -0,0 +1,2 @@
+pub mod tokio_writer_adapter;
+pub use tokio_writer_adapter::TokioWriterAdapter;
diff --git a/pbs-tools/src/tokio/tokio_writer_adapter.rs b/pbs-tools/src/tokio/tokio_writer_adapter.rs
new file mode 100644 (file)
index 0000000..7b7f5dc
--- /dev/null
@@ -0,0 +1,26 @@
+use std::io::Write;
+
+use tokio::task::block_in_place;
+
+/// Wrapper around a writer which implements Write
+///
+/// wraps each write with a 'block_in_place' so that
+/// any (blocking) writer can be safely used in async context in a
+/// tokio runtime
+pub struct TokioWriterAdapter<W: Write>(W);
+
+impl<W: Write> TokioWriterAdapter<W> {
+    pub fn new(writer: W) -> Self {
+        Self(writer)
+    }
+}
+
+impl<W: Write> Write for TokioWriterAdapter<W> {
+    fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
+        block_in_place(|| self.0.write(buf))
+    }
+
+    fn flush(&mut self) -> Result<(), std::io::Error> {
+        block_in_place(|| self.0.flush())
+    }
+}
index c6bfbb9e86bb9788e917ddba5b55af3ce655d7a5..32dfe9de31a0a03256fc3ee0ac81bebed0e05e6a 100644 (file)
@@ -11,6 +11,7 @@ use proxmox::api::{api, Permission, RpcEnvironment};
 use proxmox::{http_err, list_subdirs_api_method};
 use proxmox::{identity, sortable};
 
+use pbs_tools::auth::private_auth_key;
 use pbs_tools::ticket::{self, Empty, Ticket};
 
 use crate::api2::types::*;
index a778aa2a58fc4c82bacab5abe417bd4ddbffb775..783864cc9461410b2052d6d388daf71073b8dc1f 100644 (file)
@@ -14,6 +14,7 @@ use proxmox::tools::fs::open_file_locked;
 use proxmox_openid::{OpenIdAuthenticator,  OpenIdConfig};
 
 use pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR_M;
+use pbs_tools::auth::private_auth_key;
 use pbs_tools::ticket::Ticket;
 
 use crate::server::ticket::ApiTicket;
index 34842673efc3fd42d59b54a89a3dcbf07b2d6d35..b249b22dfb5a144d5d1cf949b84ee3b1e3221123 100644 (file)
@@ -12,6 +12,7 @@ use proxmox::api::{api, Permission, Router, RpcEnvironment};
 use proxmox::list_subdirs_api_method;
 
 use pbs_buildcfg::configdir;
+use pbs_tools::cert;
 
 use crate::acme::AcmeClient;
 use crate::api2::types::Authid;
@@ -20,7 +21,6 @@ use crate::api2::types::AcmeDomain;
 use crate::config::acl::PRIV_SYS_MODIFY;
 use crate::config::node::NodeConfig;
 use crate::server::WorkerTask;
-use crate::tools::cert;
 
 pub const ROUTER: Router = Router::new()
     .get(&list_subdirs_api_method!(SUBDIRS))
index 208cbf98c79f45223388aac187375b4f6f25fd8d..b9980ef771d705ed78cac5e622857aab5aa28daf 100644 (file)
@@ -20,6 +20,7 @@ use proxmox::list_subdirs_api_method;
 use proxmox_http::websocket::WebSocket;
 use proxmox::{identity, sortable};
 
+use pbs_tools::auth::private_auth_key;
 use pbs_tools::ticket::{self, Empty, Ticket};
 
 use crate::api2::types::*;
@@ -121,7 +122,7 @@ async fn termproxy(
 
     let ticket = Ticket::new(ticket::TERM_PREFIX, &Empty)?
         .sign(
-            crate::auth_helpers::private_auth_key(),
+            private_auth_key(),
             Some(&tools::ticket::term_aad(&userid, &path, port)),
         )?;
 
index a82c0c8a9dece331cdb92a86e9d79619bff376fa..12f6dc71065d13b58787cf2f4a821432ca17e86c 100644 (file)
@@ -8,9 +8,10 @@ use proxmox::sys::linux::procfs;
 
 use proxmox::api::{api, ApiMethod, Router, RpcEnvironment, Permission};
 
+use pbs_tools::cert::CertInfo;
+
 use crate::api2::types::*;
 use crate::config::acl::{PRIV_SYS_AUDIT, PRIV_SYS_POWER_MANAGEMENT};
-use crate::tools::cert::CertInfo;
 
 impl std::convert::From<procfs::ProcFsCPUInfo> for NodeCpuInformation {
     fn from(info: procfs::ProcFsCPUInfo) -> Self {
index 33d142b304550a2de87d03e4cd86c5f297a542cb..15e782a5ba806e19e3d9dabafaf93358e3e37ec0 100644 (file)
@@ -1,18 +1,16 @@
+use std::path::PathBuf;
+
 use anyhow::{bail, format_err, Error};
 use lazy_static::lazy_static;
-
-use openssl::rsa::{Rsa};
-use openssl::pkey::{PKey, Public, Private};
+use openssl::pkey::{PKey, Public};
+use openssl::rsa::Rsa;
 use openssl::sha;
 
-use std::path::PathBuf;
-
 use proxmox::tools::fs::{file_get_contents, replace_file, CreateOptions};
 use proxmox::try_block;
 
 use pbs_buildcfg::configdir;
-
-use crate::api2::types::Userid;
+use pbs_api_types::Userid;
 
 fn compute_csrf_secret_digest(
     timestamp: i64,
@@ -155,24 +153,6 @@ pub fn csrf_secret() -> &'static [u8] {
     &SECRET
 }
 
-fn load_private_auth_key() -> Result<PKey<Private>, Error> {
-
-    let pem = file_get_contents(configdir!("/authkey.key"))?;
-    let rsa = Rsa::private_key_from_pem(&pem)?;
-    let key = PKey::from_rsa(rsa)?;
-
-    Ok(key)
-}
-
-pub fn private_auth_key() -> &'static PKey<Private> {
-
-    lazy_static! {
-        static ref KEY: PKey<Private> = load_private_auth_key().unwrap();
-    }
-
-    &KEY
-}
-
 fn load_public_auth_key() -> Result<PKey<Public>, Error> {
 
     let pem = file_get_contents(configdir!("/authkey.pub"))?;
index a8fbbadb9b6f7fe6ccff568496692c2a97b167e4..75104205701b0bd27d5aac90f6d575437873615b 100644 (file)
@@ -4,8 +4,8 @@ use futures::*;
 use proxmox::try_block;
 use proxmox::api::RpcEnvironmentType;
 
-//use proxmox_backup::tools;
-//use proxmox_backup::api_schema::config::*;
+use pbs_tools::auth::private_auth_key;
+
 use proxmox_backup::server::{
     self,
     auth::default_api_auth,
index a5810538ed0f39a46951f9aa1878ce6dd2334e79..3a8a42a092ca40cf2f796d18544c00fe8af3fc56 100644 (file)
@@ -28,12 +28,9 @@ use proxmox::{
 use pxar::accessor::{MaybeReady, ReadAt, ReadAtOperation};
 
 use pbs_datastore::catalog::BackupCatalogWriter;
+use pbs_tools::sync::StdChannelWriter;
+use pbs_tools::tokio::TokioWriterAdapter;
 
-use proxmox_backup::tools::{
-    self,
-    StdChannelWriter,
-    TokioWriterAdapter,
-};
 use proxmox_backup::api2::types::*;
 use proxmox_backup::api2::version;
 use proxmox_backup::client::*;
@@ -64,6 +61,7 @@ use proxmox_backup::backup::{
     Shell,
     PruneOptions,
 };
+use proxmox_backup::tools;
 
 mod proxmox_backup_client;
 use proxmox_backup_client::*;
index 79f80513d98dae313ce5ecd6ad49b0ff1751b75f..228d7ed13093e98aefc614dd588eb08511820d38 100644 (file)
@@ -6,6 +6,8 @@ use serde_json::{json, Value};
 
 use proxmox::api::{api, cli::*, RpcEnvironment};
 
+use pbs_tools::percent_encoding::percent_encode_component;
+
 use proxmox_backup::tools;
 use proxmox_backup::config;
 use proxmox_backup::api2::{self, types::* };
@@ -188,7 +190,7 @@ async fn task_stop(param: Value) -> Result<Value, Error> {
 
     let mut client = connect_to_localhost()?;
 
-    let path = format!("api2/json/nodes/localhost/tasks/{}", tools::percent_encode_component(upid_str));
+    let path = format!("api2/json/nodes/localhost/tasks/{}", percent_encode_component(upid_str));
     let _ = client.delete(&path, None).await?;
 
     Ok(Value::Null)
index e76839322b1cb1ab3fe0a67b7f492ff2db08b32d..e6fcc74ea94657d417fcd39135ba6d0998191109 100644 (file)
@@ -3,6 +3,8 @@ use serde_json::{json, Value};
 
 use proxmox::api::{api, cli::*};
 
+use pbs_tools::percent_encoding::percent_encode_component;
+
 use proxmox_backup::tools;
 
 use proxmox_backup::client::*;
@@ -125,7 +127,7 @@ async fn task_stop(param: Value) -> Result<Value, Error> {
 
     let mut client = connect(&repo)?;
 
-    let path = format!("api2/json/nodes/localhost/tasks/{}", tools::percent_encode_component(upid_str));
+    let path = format!("api2/json/nodes/localhost/tasks/{}", percent_encode_component(upid_str));
     let _ = client.delete(&path, None).await?;
 
     Ok(Value::Null)
index 845c8edc2b6d3ac14e03efe065d6ad8151651717..c570572cec170885cf7de92e52a37c84b4cf709f 100644 (file)
@@ -2,9 +2,10 @@ use anyhow::{bail, Error};
 
 use proxmox::api::{api, cli::*};
 
+use pbs_tools::cert::CertInfo;
+
 use proxmox_backup::config;
 use proxmox_backup::auth_helpers::*;
-use proxmox_backup::tools::cert::CertInfo;
 
 #[api]
 /// Display node certificate information.
index 5ab38b777198f2446927139883f8ba68d2d66d37..8b3ddefaed30ebb2f155dca15a110b7c12aba0fa 100644 (file)
@@ -14,10 +14,15 @@ use tokio_stream::wrappers::ReceiverStream;
 
 use proxmox::tools::digest_to_hex;
 
+use pbs_datastore::{CATALOG_NAME, CryptConfig};
+use pbs_datastore::data_blob::{ChunkInfo, DataBlob, DataChunkBuilder};
+use pbs_datastore::dynamic_index::DynamicIndexReader;
+use pbs_datastore::fixed_index::FixedIndexReader;
+use pbs_datastore::index::IndexFile;
+use pbs_datastore::manifest::{ArchiveType, BackupManifest, MANIFEST_BLOB_NAME};
 use pbs_tools::format::HumanByte;
 
 use super::merge_known_chunks::{MergeKnownChunks, MergedChunkInfo};
-use crate::backup::*;
 
 use super::{H2Client, HttpClient};
 
@@ -283,7 +288,7 @@ impl BackupWriter {
 
         if let Some(manifest) = options.previous_manifest {
             // try, but ignore errors
-            match archive_type(archive_name) {
+            match ArchiveType::from_path(archive_name) {
                 Ok(ArchiveType::FixedIndex) => {
                     let _ = self
                         .download_previous_fixed_index(
index d19fa5c25a370afb00a01c5bfd7a1886f0c4b809..a83b8d3c7cad47e037c78645bf48c0ef611929d0 100644 (file)
@@ -24,15 +24,13 @@ use proxmox_http::client::HttpsConnector;
 use proxmox_http::uri::build_authority;
 
 use pbs_api_types::{Authid, Userid};
+use pbs_tools::broadcast_future::BroadcastFuture;
 use pbs_tools::json::json_object_to_query;
 use pbs_tools::ticket;
+use pbs_tools::percent_encoding::DEFAULT_ENCODE_SET;
 
 use super::pipe_to_stream::PipeToSendStream;
-use crate::tools::{
-    BroadcastFuture,
-    DEFAULT_ENCODE_SET,
-    PROXMOX_BACKUP_TCP_KEEPALIVE_TIME,
-};
+use super::PROXMOX_BACKUP_TCP_KEEPALIVE_TIME;
 
 /// Timeout used for several HTTP operations that are expected to finish quickly but may block in
 /// certain error conditions. Keep it generous, to avoid false-positive under high load.
index 8a4e45567ce317a49d624991334ba02ff69e6335..8ff000186683f914f8d3bb951b644767f5e9321e 100644 (file)
@@ -7,11 +7,8 @@ use anyhow::Error;
 
 use pbs_api_types::{Authid, Userid};
 use pbs_tools::ticket::Ticket;
-
-use crate::{
-    tools::cert::CertInfo,
-    auth_helpers::private_auth_key,
-};
+use pbs_tools::cert::CertInfo;
+use pbs_tools::auth::private_auth_key;
 
 mod merge_known_chunks;
 pub mod pipe_to_stream;
@@ -43,6 +40,8 @@ pub use backup_repo::*;
 mod backup_specification;
 pub use backup_specification::*;
 
+pub const PROXMOX_BACKUP_TCP_KEEPALIVE_TIME: u32 = 120;
+
 /// Connect to localhost:8007 as root@pam
 ///
 /// This automatically creates a ticket if run as 'root' user.
index 86bc858335838dc25ef4fc70140b24844b8d519d..d39eb6c4dd4b69cee45a7c58e51666df3793b9e0 100644 (file)
@@ -13,11 +13,8 @@ use nix::fcntl::OFlag;
 use nix::sys::stat::Mode;
 
 use pbs_datastore::catalog::CatalogWriter;
-
-use crate::tools::{
-    StdChannelWriter,
-    TokioWriterAdapter,
-};
+use pbs_tools::sync::StdChannelWriter;
+use pbs_tools::tokio::TokioWriterAdapter;
 
 /// Stream implementation to encode and upload .pxar archives.
 ///
index 695e6f3bab29904071c93ca258e501e9b48a4c15..6350e83afd47905f307c1e05ad277956c9cd446c 100644 (file)
@@ -7,15 +7,10 @@ use futures::*;
 
 use proxmox::api::cli::format_and_print_result;
 
-use super::HttpClient;
-use crate::{
-    server::{
-        worker_is_active_local,
-        UPID,
-    },
-    tools,
-};
+use pbs_tools::percent_encoding::percent_encode_component;
 
+use super::HttpClient;
+use crate::server::{UPID, worker_is_active_local};
 
 /// Display task log on console
 ///
@@ -54,13 +49,13 @@ pub async fn display_task_log(
 
             let abort = abort_count.load(Ordering::Relaxed);
             if abort > 0 {
-                let path = format!("api2/json/nodes/localhost/tasks/{}", tools::percent_encode_component(upid_str));
+                let path = format!("api2/json/nodes/localhost/tasks/{}", percent_encode_component(upid_str));
                 let _ = client.delete(&path, None).await?;
             }
 
             let param = json!({ "start": start, "limit": limit, "test-status": true });
 
-            let path = format!("api2/json/nodes/localhost/tasks/{}/log", tools::percent_encode_component(upid_str));
+            let path = format!("api2/json/nodes/localhost/tasks/{}/log", percent_encode_component(upid_str));
             let result = client.get(&path, Some(param)).await?;
 
             let active = result["active"].as_bool().unwrap();
index d735b6ead296f79758d169264e22904805f5d054..3f0f373824260adf3e553e7b2cfac718039105f3 100644 (file)
@@ -1,10 +1,8 @@
-use anyhow::{bail, format_err, Error};
-use futures::*;
-
-use core::task::Context;
 use std::pin::Pin;
-use std::task::Poll;
+use std::task::{Context, Poll};
 
+use anyhow::{bail, format_err, Error};
+use futures::*;
 use http::Uri;
 use http::{Request, Response};
 use hyper::client::connect::{Connected, Connection};
diff --git a/src/tools/broadcast_future.rs b/src/tools/broadcast_future.rs
deleted file mode 100644 (file)
index 7bfd83b..0000000
+++ /dev/null
@@ -1,180 +0,0 @@
-use std::future::Future;
-use std::pin::Pin;
-use std::sync::{Arc, Mutex};
-
-use anyhow::{format_err, Error};
-use futures::future::{FutureExt, TryFutureExt};
-use tokio::sync::oneshot;
-
-/// Broadcast results to registered listeners using asnyc oneshot channels
-#[derive(Default)]
-pub struct BroadcastData<T> {
-    result: Option<Result<T, String>>,
-    listeners: Vec<oneshot::Sender<Result<T, Error>>>,
-}
-
-impl <T: Clone> BroadcastData<T> {
-
-    pub fn new() -> Self {
-        Self {
-            result: None,
-            listeners: vec![],
-        }
-    }
-
-    pub fn notify_listeners(&mut self, result: Result<T, String>) {
-
-        self.result = Some(result.clone());
-
-        loop {
-            match self.listeners.pop() {
-                None => { break; },
-                Some(ch) => {
-                    match &result {
-                        Ok(result) => { let _ = ch.send(Ok(result.clone())); },
-                        Err(err) => { let _ = ch.send(Err(format_err!("{}", err))); },
-                    }
-                },
-            }
-        }
-    }
-
-    pub fn listen(&mut self) -> impl Future<Output = Result<T, Error>> {
-        use futures::future::{ok, Either};
-
-        match &self.result {
-            None => {},
-            Some(Ok(result)) => return Either::Left(ok(result.clone())),
-            Some(Err(err)) => return Either::Left(futures::future::err(format_err!("{}", err))),
-        }
-
-        let (tx, rx) = oneshot::channel::<Result<T, Error>>();
-
-        self.listeners.push(tx);
-
-        Either::Right(rx
-            .map(|res| match res {
-                Ok(Ok(t)) => Ok(t),
-                Ok(Err(e)) => Err(e),
-                Err(e) => Err(Error::from(e)),
-            })
-        )
-    }
-}
-
-type SourceFuture<T> = Pin<Box<dyn Future<Output = Result<T, Error>> + Send>>;
-
-struct BroadCastFutureBinding<T> {
-    broadcast: BroadcastData<T>,
-    future: Option<SourceFuture<T>>,
-}
-
-/// Broadcast future results to registered listeners
-pub struct BroadcastFuture<T> {
-    inner: Arc<Mutex<BroadCastFutureBinding<T>>>,
-}
-
-impl<T: Clone + Send + 'static> BroadcastFuture<T> {
-    /// Create instance for specified source future.
-    ///
-    /// The result of the future is sent to all registered listeners.
-    pub fn new(source: Box<dyn Future<Output = Result<T, Error>> + Send>) -> Self {
-        let inner = BroadCastFutureBinding {
-            broadcast: BroadcastData::new(),
-            future: Some(Pin::from(source)),
-        };
-        Self { inner: Arc::new(Mutex::new(inner)) }
-    }
-
-    /// Creates a new instance with a oneshot channel as trigger
-    pub fn new_oneshot() -> (Self, oneshot::Sender<Result<T, Error>>) {
-
-        let (tx, rx) = oneshot::channel::<Result<T, Error>>();
-        let rx = rx
-            .map_err(Error::from)
-            .and_then(futures::future::ready);
-
-        (Self::new(Box::new(rx)), tx)
-    }
-
-    fn notify_listeners(
-        inner: Arc<Mutex<BroadCastFutureBinding<T>>>,
-        result: Result<T, String>,
-    ) {
-        let mut data = inner.lock().unwrap();
-        data.broadcast.notify_listeners(result);
-    }
-
-    fn spawn(inner: Arc<Mutex<BroadCastFutureBinding<T>>>) -> impl Future<Output = Result<T, Error>> {
-        let mut data = inner.lock().unwrap();
-
-        if let Some(source) = data.future.take() {
-
-            let inner1 = inner.clone();
-
-            let task = source.map(move |value| {
-                match value {
-                    Ok(value) => Self::notify_listeners(inner1, Ok(value)),
-                    Err(err) => Self::notify_listeners(inner1, Err(err.to_string())),
-                }
-            });
-            tokio::spawn(task);
-        }
-
-        data.broadcast.listen()
-    }
-
-    /// Register a listener
-    pub fn listen(&self) -> impl Future<Output = Result<T, Error>> {
-        let inner2 = self.inner.clone();
-        async move { Self::spawn(inner2).await }
-    }
-}
-
-#[test]
-fn test_broadcast_future() {
-    use std::sync::atomic::{AtomicUsize, Ordering};
-
-    static CHECKSUM: AtomicUsize  = AtomicUsize::new(0);
-
-    let (sender, trigger) = BroadcastFuture::new_oneshot();
-
-    let receiver1 = sender.listen()
-        .map_ok(|res| {
-            CHECKSUM.fetch_add(res, Ordering::SeqCst);
-        })
-        .map_err(|err| { panic!("got error {}", err); })
-        .map(|_| ());
-
-    let receiver2 = sender.listen()
-        .map_ok(|res| {
-            CHECKSUM.fetch_add(res*2, Ordering::SeqCst);
-        })
-        .map_err(|err| { panic!("got error {}", err); })
-        .map(|_| ());
-
-    let rt = tokio::runtime::Runtime::new().unwrap();
-    rt.block_on(async move {
-        let r1 = tokio::spawn(receiver1);
-        let r2 = tokio::spawn(receiver2);
-
-        trigger.send(Ok(1)).unwrap();
-        let _ = r1.await;
-        let _ = r2.await;
-    });
-
-    let result = CHECKSUM.load(Ordering::SeqCst);
-
-    assert_eq!(result, 3);
-
-    // the result stays available until the BroadcastFuture is dropped
-    rt.block_on(sender.listen()
-        .map_ok(|res| {
-            CHECKSUM.fetch_add(res*4, Ordering::SeqCst);
-        })
-        .map_err(|err| { panic!("got error {}", err); })
-        .map(|_| ()));
-
-    let result = CHECKSUM.load(Ordering::SeqCst);
-    assert_eq!(result, 7);
-}
diff --git a/src/tools/cert.rs b/src/tools/cert.rs
deleted file mode 100644 (file)
index cef04fe..0000000
+++ /dev/null
@@ -1,104 +0,0 @@
-use std::path::PathBuf;
-use std::mem::MaybeUninit;
-
-use anyhow::{bail, format_err, Error};
-use foreign_types::ForeignTypeRef;
-use openssl::x509::{X509, GeneralName};
-use openssl::stack::Stack;
-use openssl::pkey::{Public, PKey};
-
-use pbs_buildcfg::configdir;
-
-// C type:
-#[allow(non_camel_case_types)]
-type ASN1_TIME = <openssl::asn1::Asn1TimeRef as ForeignTypeRef>::CType;
-
-extern "C" {
-    fn ASN1_TIME_to_tm(s: *const ASN1_TIME, tm: *mut libc::tm) -> libc::c_int;
-}
-
-fn asn1_time_to_unix(time: &openssl::asn1::Asn1TimeRef) -> Result<i64, Error> {
-    let mut c_tm = MaybeUninit::<libc::tm>::uninit();
-    let rc = unsafe { ASN1_TIME_to_tm(time.as_ptr(), c_tm.as_mut_ptr()) };
-    if rc != 1 {
-        bail!("failed to parse ASN1 time");
-    }
-    let mut c_tm = unsafe { c_tm.assume_init() };
-    proxmox::tools::time::timegm(&mut c_tm)
-}
-
-pub struct CertInfo {
-    x509: X509,
-}
-
-fn x509name_to_string(name: &openssl::x509::X509NameRef) -> Result<String, Error> {
-    let mut parts = Vec::new();
-    for entry in name.entries() {
-        parts.push(format!("{} = {}", entry.object().nid().short_name()?, entry.data().as_utf8()?));
-    }
-    Ok(parts.join(", "))
-}
-
-impl CertInfo {
-    pub fn new() -> Result<Self, Error> {
-        Self::from_path(PathBuf::from(configdir!("/proxy.pem")))
-    }
-
-    pub fn from_path(path: PathBuf) -> Result<Self, Error> {
-        Self::from_pem(&proxmox::tools::fs::file_get_contents(&path)?)
-            .map_err(|err| format_err!("failed to load certificate from {:?} - {}", path, err))
-    }
-
-    pub fn from_pem(cert_pem: &[u8]) -> Result<Self, Error> {
-        let x509 = openssl::x509::X509::from_pem(&cert_pem)?;
-        Ok(Self{
-            x509
-        })
-    }
-
-    pub fn subject_alt_names(&self) -> Option<Stack<GeneralName>> {
-        self.x509.subject_alt_names()
-    }
-
-    pub fn subject_name(&self) -> Result<String, Error> {
-        Ok(x509name_to_string(self.x509.subject_name())?)
-    }
-
-    pub fn issuer_name(&self) -> Result<String, Error> {
-        Ok(x509name_to_string(self.x509.issuer_name())?)
-    }
-
-    pub fn fingerprint(&self) -> Result<String, Error> {
-        let fp = self.x509.digest(openssl::hash::MessageDigest::sha256())?;
-        let fp_string = proxmox::tools::digest_to_hex(&fp);
-        let fp_string = fp_string.as_bytes().chunks(2).map(|v| std::str::from_utf8(v).unwrap())
-            .collect::<Vec<&str>>().join(":");
-        Ok(fp_string)
-    }
-
-    pub fn public_key(&self) -> Result<PKey<Public>, Error> {
-        let pubkey = self.x509.public_key()?;
-        Ok(pubkey)
-    }
-
-    pub fn not_before(&self) -> &openssl::asn1::Asn1TimeRef {
-        self.x509.not_before()
-    }
-
-    pub fn not_after(&self) -> &openssl::asn1::Asn1TimeRef {
-        self.x509.not_after()
-    }
-
-    pub fn not_before_unix(&self) -> Result<i64, Error> {
-        asn1_time_to_unix(&self.not_before())
-    }
-
-    pub fn not_after_unix(&self) -> Result<i64, Error> {
-        asn1_time_to_unix(&self.not_after())
-    }
-
-    /// Check if the certificate is expired at or after a specific unix epoch.
-    pub fn is_expired_after_epoch(&self, epoch: i64) -> Result<bool, Error> {
-        Ok(self.not_after_unix()? < epoch)
-    }
-}
index 900f33c03146b5633ffd9bf5b4e05a56fcc55fde..658c70142411dc99c4e19f6aef8148cdf93558b9 100644 (file)
@@ -12,7 +12,6 @@ use std::path::Path;
 use anyhow::{bail, format_err, Error};
 use serde_json::Value;
 use openssl::hash::{hash, DigestBytes, MessageDigest};
-use percent_encoding::{utf8_percent_encode, AsciiSet};
 
 pub use proxmox::tools::fd::Fd;
 use proxmox::tools::fs::{create_path, CreateOptions};
@@ -33,7 +32,6 @@ pub use pbs_tools::process_locker::{
 pub mod acl;
 pub mod apt;
 pub mod async_io;
-pub mod cert;
 pub mod compression;
 pub mod config;
 pub mod cpio;
@@ -67,17 +65,10 @@ pub use wrapped_reader_stream::{AsyncReaderStream, StdChannelStream, WrappedRead
 mod async_channel_writer;
 pub use async_channel_writer::AsyncChannelWriter;
 
-mod std_channel_writer;
-pub use std_channel_writer::StdChannelWriter;
-
-mod tokio_writer_adapter;
-pub use tokio_writer_adapter::TokioWriterAdapter;
-
 mod file_logger;
 pub use file_logger::{FileLogger, FileLogOptions};
 
-mod broadcast_future;
-pub use broadcast_future::{BroadcastData, BroadcastFuture};
+pub use pbs_tools::broadcast_future::{BroadcastData, BroadcastFuture};
 
 /// The `BufferedRead` trait provides a single function
 /// `buffered_read`. It returns a reference to an internal buffer. The
@@ -235,11 +226,6 @@ pub fn extract_cookie(cookie: &str, cookie_name: &str) -> Option<String> {
     None
 }
 
-/// percent encode a url component
-pub fn percent_encode_component(comp: &str) -> String {
-    utf8_percent_encode(comp, percent_encoding::NON_ALPHANUMERIC).to_string()
-}
-
 /// Detect modified configuration files
 ///
 /// This function fails with a reasonable error message if checksums do not match.
@@ -355,22 +341,6 @@ pub fn pbs_simple_http(proxy_config: Option<ProxyConfig>) -> SimpleHttp {
     SimpleHttp::with_options(options)
 }
 
-/// This used to be: `SIMPLE_ENCODE_SET` plus space, `"`, `#`, `<`, `>`, backtick, `?`, `{`, `}`
-pub const DEFAULT_ENCODE_SET: &AsciiSet = &percent_encoding::CONTROLS // 0..1f and 7e
-    // The SIMPLE_ENCODE_SET adds space and anything >= 0x7e (7e itself is already included above)
-    .add(0x20)
-    .add(0x7f)
-    // the DEFAULT_ENCODE_SET added:
-    .add(b' ')
-    .add(b'"')
-    .add(b'#')
-    .add(b'<')
-    .add(b'>')
-    .add(b'`')
-    .add(b'?')
-    .add(b'{')
-    .add(b'}');
-
 /// Get an iterator over lines of a file, skipping empty lines and comments (lines starting with a
 /// `#`).
 pub fn file_get_non_comment_lines<P: AsRef<Path>>(
diff --git a/src/tools/std_channel_writer.rs b/src/tools/std_channel_writer.rs
deleted file mode 100644 (file)
index d2f6444..0000000
+++ /dev/null
@@ -1,28 +0,0 @@
-use std::io::Write;
-use std::sync::mpsc::SyncSender;
-
-use anyhow::{Error};
-
-/// Wrapper around SyncSender, which implements Write
-///
-/// Each write in translated into a send(Vec<u8>).
-pub struct StdChannelWriter(SyncSender<Result<Vec<u8>, Error>>);
-
-impl StdChannelWriter {
-    pub fn new(sender: SyncSender<Result<Vec<u8>, Error>>) -> Self {
-        Self(sender)
-    }
-}
-
-impl Write for StdChannelWriter {
-    fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
-        self.0
-            .send(Ok(buf.to_vec()))
-            .map_err(proxmox::sys::error::io_err_other)
-            .and(Ok(buf.len()))
-    }
-
-    fn flush(&mut self) -> Result<(), std::io::Error> {
-        Ok(())
-    }
-}
diff --git a/src/tools/tokio_writer_adapter.rs b/src/tools/tokio_writer_adapter.rs
deleted file mode 100644 (file)
index 7b7f5dc..0000000
+++ /dev/null
@@ -1,26 +0,0 @@
-use std::io::Write;
-
-use tokio::task::block_in_place;
-
-/// Wrapper around a writer which implements Write
-///
-/// wraps each write with a 'block_in_place' so that
-/// any (blocking) writer can be safely used in async context in a
-/// tokio runtime
-pub struct TokioWriterAdapter<W: Write>(W);
-
-impl<W: Write> TokioWriterAdapter<W> {
-    pub fn new(writer: W) -> Self {
-        Self(writer)
-    }
-}
-
-impl<W: Write> Write for TokioWriterAdapter<W> {
-    fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
-        block_in_place(|| self.0.write(buf))
-    }
-
-    fn flush(&mut self) -> Result<(), std::io::Error> {
-        block_in_place(|| self.0.flush())
-    }
-}