]> git.proxmox.com Git - proxmox-backup.git/commitdiff
use new proxmox-async crate
authorDietmar Maurer <dietmar@proxmox.com>
Fri, 19 Nov 2021 16:36:06 +0000 (17:36 +0100)
committerDietmar Maurer <dietmar@proxmox.com>
Fri, 19 Nov 2021 17:03:22 +0000 (18:03 +0100)
Signed-off-by: Dietmar Maurer <dietmar@proxmox.com>
56 files changed:
Cargo.toml
Makefile
debian/control
examples/download-speed.rs
examples/h2client.rs
examples/h2s-client.rs
examples/h2s-server.rs
examples/h2server.rs
examples/test_chunk_speed2.rs
examples/upload-speed.rs
pbs-client/Cargo.toml
pbs-client/src/catalog_shell.rs
pbs-client/src/http_client.rs
pbs-client/src/pxar/extract.rs
pbs-client/src/pxar_backup_stream.rs
pbs-client/src/remote_chunk_reader.rs
pbs-client/src/tools/mod.rs
pbs-runtime/Cargo.toml [deleted file]
pbs-runtime/src/lib.rs [deleted file]
pbs-tools/Cargo.toml
pbs-tools/src/async_lru_cache.rs
pbs-tools/src/blocking.rs [deleted file]
pbs-tools/src/broadcast_future.rs [deleted file]
pbs-tools/src/compression.rs [deleted file]
pbs-tools/src/lib.rs
pbs-tools/src/stream.rs [deleted file]
pbs-tools/src/tokio/mod.rs [deleted file]
pbs-tools/src/tokio/tokio_writer_adapter.rs [deleted file]
pbs-tools/src/zip.rs [deleted file]
proxmox-backup-client/Cargo.toml
proxmox-backup-client/src/main.rs
proxmox-backup-client/src/mount.rs
proxmox-file-restore/Cargo.toml
proxmox-file-restore/src/main.rs
proxmox-rest-server/Cargo.toml
proxmox-rest-server/src/rest.rs
proxmox-rest-server/src/state.rs
proxmox-restore-daemon/Cargo.toml
proxmox-restore-daemon/src/main.rs
proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs
pxar-bin/Cargo.toml
pxar-bin/src/main.rs
src/api2/admin/datastore.rs
src/api2/backup/mod.rs
src/api2/backup/upload_chunk.rs
src/api2/node/apt.rs
src/api2/reader/mod.rs
src/bin/proxmox-backup-api.rs
src/bin/proxmox-backup-debug.rs
src/bin/proxmox-backup-manager.rs
src/bin/proxmox-backup-proxy.rs
src/bin/proxmox-daily-update.rs
src/bin/proxmox-tape.rs
src/bin/proxmox_backup_debug/api.rs
src/server/pull.rs
src/tools/subscription.rs

index 556e28e2e7b2079720c692acde3ab41da6180029..871fdfaa0e588f4747f6602147659169a9af54fe 100644 (file)
@@ -25,7 +25,6 @@ members = [
     "pbs-config",
     "pbs-datastore",
     "pbs-fuse-loop",
-    "pbs-runtime",
     "proxmox-rest-server",
     "proxmox-rrd",
     "pbs-tape",
@@ -110,6 +109,7 @@ proxmox-sys = "0.1"
 
 proxmox-acme-rs = "0.3"
 proxmox-apt = "0.8.0"
+proxmox-async = "0.1"
 proxmox-openid = "0.9.0"
 
 pbs-api-types = { path = "pbs-api-types" }
@@ -117,7 +117,6 @@ pbs-buildcfg = { path = "pbs-buildcfg" }
 pbs-client = { path = "pbs-client" }
 pbs-config = { path = "pbs-config" }
 pbs-datastore = { path = "pbs-datastore" }
-pbs-runtime = { path = "pbs-runtime" }
 proxmox-rest-server = { path = "proxmox-rest-server" }
 proxmox-rrd = { path = "proxmox-rrd" }
 pbs-tools = { path = "pbs-tools" }
index 20d1382f2854e52d934319966aaf79e14bb4dd18..6a38931f6a3f1e205134ec78383846f8178441ea 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -38,7 +38,6 @@ SUBCRATES := \
        pbs-config \
        pbs-datastore \
        pbs-fuse-loop \
-       pbs-runtime \
        proxmox-rest-server \
        proxmox-rrd \
        pbs-tape \
index 780af008b99d99d16dcc8e0121a2d26653d8fb88..2731ee119e141a20f258ef174f5fea7a562c34f7 100644 (file)
@@ -44,6 +44,7 @@ Build-Depends: debhelper (>= 12),
  librust-proxmox-0.15+default-dev (>= 0.15.3-~~),
  librust-proxmox-0.15+sortable-macro-dev (>= 0.15.3-~~),
  librust-proxmox-0.15+tokio-dev (>= 0.15.3-~~),
+ librust-proxmox-async-0.1+default-dev,
  librust-proxmox-acme-rs-0.3+default-dev,
  librust-proxmox-apt-0.8+default-dev,
  librust-proxmox-borrow-1+default-dev,
index a248ae875a39788bd11fa7d053f6b5cfe5038321..75176db1b207de7d986b9fdd86ee8c2d531b2cd4 100644 (file)
@@ -59,7 +59,7 @@ async fn run() -> Result<(), Error> {
 }
 
 fn main() {
-    if let Err(err) = pbs_runtime::main(run()) {
+    if let Err(err) = proxmox_async::runtime::main(run()) {
         eprintln!("ERROR: {}", err);
     }
     println!("DONE");
index 754a122d5665b32fd1b9cf14ae626c6caa694837..2588631e4bb0900355262f4abae340c37267580d 100644 (file)
@@ -69,7 +69,7 @@ fn send_request(
 }
 
 fn main() -> Result<(), Error> {
-    pbs_runtime::main(run())
+    proxmox_async::runtime::main(run())
 }
 
 async fn run() -> Result<(), Error> {
index 4bd54a15918c698335fdb7af3fa819ab0ddd9f4b..356dbc592f464cb4e727e5b037f8f036f6bd0077 100644 (file)
@@ -69,7 +69,7 @@ fn send_request(
 }
 
 fn main() -> Result<(), Error> {
-    pbs_runtime::main(run())
+    proxmox_async::runtime::main(run())
 }
 
 async fn run() -> Result<(), Error> {
index a11476a82419a479696ded826df39bd6f2972a15..f1f08513756ae4915523a6a4465cc50617b0558c 100644 (file)
@@ -9,7 +9,7 @@ use tokio::net::{TcpListener, TcpStream};
 use pbs_buildcfg::configdir;
 
 fn main() -> Result<(), Error> {
-    pbs_runtime::main(run())
+    proxmox_async::runtime::main(run())
 }
 
 async fn run() -> Result<(), Error> {
index 98e06f5290878729a95a892b8321d7e15836cf96..5802fc8880675ad0dc58dbdeb35b56d9a81c89ef 100644 (file)
@@ -5,7 +5,7 @@ use hyper::{Body, Request, Response};
 use tokio::net::{TcpListener, TcpStream};
 
 fn main() -> Result<(), Error> {
-    pbs_runtime::main(run())
+    proxmox_async::runtime::main(run())
 }
 
 async fn run() -> Result<(), Error> {
index f641753282c9a3fab7e97776421f9af769aaeb9e..9274183aca1be0dffb2f91685d6124a3c69135e1 100644 (file)
@@ -13,7 +13,7 @@ use pbs_client::ChunkStream;
 // Note: I can currently get about 830MB/s
 
 fn main() {
-    if let Err(err) = pbs_runtime::main(run()) {
+    if let Err(err) = proxmox_async::runtime::main(run()) {
         panic!("ERROR: {}", err);
     }
 }
index b670d0606a51cf8375f1b3b51f18e586443706a9..04e35124aa22d6dd9b84dca78c29cd9a83330bac 100644 (file)
@@ -27,7 +27,7 @@ async fn upload_speed() -> Result<f64, Error> {
 }
 
 fn main()  {
-    match pbs_runtime::main(upload_speed()) {
+    match proxmox_async::runtime::main(upload_speed()) {
         Ok(mbs) => {
             println!("average upload speed: {} MB/s", mbs);
         }
index 135e4045ba8da627e178f9680cd59ce53ea9a05b..a12f512b4716495b10c716f31ccf4562d991527b 100644 (file)
@@ -29,6 +29,7 @@ xdg = "2.2"
 
 pathpatterns = "0.1.2"
 proxmox = "0.15.3"
+proxmox-async = "0.1"
 proxmox-fuse = "0.1.1"
 proxmox-http = { version = "0.5.4", features = [ "client", "http-helpers", "websocket" ] }
 proxmox-io = { version = "1", features = [ "tokio" ] }
@@ -41,5 +42,4 @@ pxar = { version = "0.10.1", features = [ "tokio-io" ] }
 pbs-api-types = { path = "../pbs-api-types" }
 pbs-buildcfg = { path = "../pbs-buildcfg" }
 pbs-datastore = { path = "../pbs-datastore" }
-pbs-runtime = { path = "../pbs-runtime" }
 pbs-tools = { path = "../pbs-tools" }
index dbc23ef642ba51dbaec233c3d309fd57417a5a5b..aab4c98966cb5b518b5d36b9740736bbc9306d75 100644 (file)
@@ -19,7 +19,7 @@ use proxmox_router::cli::{self, CliCommand, CliCommandMap, CliHelper, CommandLin
 use proxmox_schema::api;
 use pxar::{EntryKind, Metadata};
 
-use pbs_runtime::block_in_place;
+use proxmox_async::runtime::block_in_place;
 use pbs_datastore::catalog::{self, DirEntryAttribute};
 
 use crate::pxar::Flags;
index defaef8a634d8221c0ce45bde77b759f5a666922..61f05f28df25923f8173085177f1e3a794d4a395 100644 (file)
@@ -22,9 +22,9 @@ use proxmox_router::HttpError;
 
 use proxmox_http::client::{HttpsConnector, RateLimiter};
 use proxmox_http::uri::build_authority;
+use proxmox_async::broadcast_future::BroadcastFuture;
 
 use pbs_api_types::{Authid, Userid};
-use pbs_tools::broadcast_future::BroadcastFuture;
 use pbs_tools::json::json_object_to_query;
 use pbs_tools::ticket;
 use pbs_tools::percent_encoding::DEFAULT_ENCODE_SET;
index 52c6bf34cb8f78702d593d8fca1475afe4c864cd..350f96cf112604d9f4ba575bd94a7d284a6f60bf 100644 (file)
@@ -25,7 +25,7 @@ use proxmox::c_result;
 use proxmox::tools::fs::{create_path, CreateOptions};
 use proxmox_io::{sparse_copy, sparse_copy_async};
 
-use pbs_tools::zip::{ZipEncoder, ZipEntry};
+use proxmox_async::zip::{ZipEncoder, ZipEntry};
 
 use crate::pxar::dir_stack::PxarDirStack;
 use crate::pxar::metadata;
index d39eb6c4dd4b69cee45a7c58e51666df3793b9e0..863c3fb46abb6f600d4332e916e53b13513678a2 100644 (file)
@@ -12,9 +12,10 @@ use nix::dir::Dir;
 use nix::fcntl::OFlag;
 use nix::sys::stat::Mode;
 
+use proxmox_async::tokio_writer_adapter::TokioWriterAdapter;
+
 use pbs_datastore::catalog::CatalogWriter;
 use pbs_tools::sync::StdChannelWriter;
-use pbs_tools::tokio::TokioWriterAdapter;
 
 /// Stream implementation to encode and upload .pxar archives.
 ///
@@ -111,7 +112,7 @@ impl Stream for PxarBackupStream {
             }
         }
 
-        match pbs_runtime::block_in_place(|| self.rx.as_ref().unwrap().recv()) {
+        match proxmox_async::runtime::block_in_place(|| self.rx.as_ref().unwrap().recv()) {
             Ok(data) => Poll::Ready(Some(data)),
             Err(_) => {
                 let error = self.error.lock().unwrap();
index 734cd29fa758e1f09b59c3f6794b58f36a0f05b5..ed7cda2dc072f75693c9b299b92b71a486e4df99 100644 (file)
@@ -5,12 +5,13 @@ use std::sync::{Arc, Mutex};
 
 use anyhow::{bail, Error};
 
+use proxmox_async::runtime::block_on;
+
 use pbs_tools::crypt_config::CryptConfig;
 use pbs_api_types::CryptMode;
 use pbs_datastore::data_blob::DataBlob;
 use pbs_datastore::read_chunk::ReadChunk;
 use pbs_datastore::read_chunk::AsyncReadChunk;
-use pbs_runtime::block_on;
 
 use super::BackupReader;
 
index 539ad6627c51b4e57421615b8e0810cbc4f5bb1e..a38b1c87b832824bb7b3df05beeef06aa9a56a33 100644 (file)
@@ -194,7 +194,7 @@ pub async fn try_get(repo: &BackupRepository, url: &str) -> Value {
 }
 
 pub fn complete_backup_group(_arg: &str, param: &HashMap<String, String>) -> Vec<String> {
-    pbs_runtime::main(async { complete_backup_group_do(param).await })
+    proxmox_async::runtime::main(async { complete_backup_group_do(param).await })
 }
 
 pub async fn complete_backup_group_do(param: &HashMap<String, String>) -> Vec<String> {
@@ -224,7 +224,7 @@ pub async fn complete_backup_group_do(param: &HashMap<String, String>) -> Vec<St
 }
 
 pub fn complete_group_or_snapshot(arg: &str, param: &HashMap<String, String>) -> Vec<String> {
-    pbs_runtime::main(async { complete_group_or_snapshot_do(arg, param).await })
+    proxmox_async::runtime::main(async { complete_group_or_snapshot_do(arg, param).await })
 }
 
 pub async fn complete_group_or_snapshot_do(arg: &str, param: &HashMap<String, String>) -> Vec<String> {
@@ -243,7 +243,7 @@ pub async fn complete_group_or_snapshot_do(arg: &str, param: &HashMap<String, St
 }
 
 pub fn complete_backup_snapshot(_arg: &str, param: &HashMap<String, String>) -> Vec<String> {
-    pbs_runtime::main(async { complete_backup_snapshot_do(param).await })
+    proxmox_async::runtime::main(async { complete_backup_snapshot_do(param).await })
 }
 
 pub async fn complete_backup_snapshot_do(param: &HashMap<String, String>) -> Vec<String> {
@@ -275,7 +275,7 @@ pub async fn complete_backup_snapshot_do(param: &HashMap<String, String>) -> Vec
 }
 
 pub fn complete_server_file_name(_arg: &str, param: &HashMap<String, String>) -> Vec<String> {
-    pbs_runtime::main(async { complete_server_file_name_do(param).await })
+    proxmox_async::runtime::main(async { complete_server_file_name_do(param).await })
 }
 
 pub async fn complete_server_file_name_do(param: &HashMap<String, String>) -> Vec<String> {
@@ -366,7 +366,7 @@ pub fn complete_chunk_size(_arg: &str, _param: &HashMap<String, String>) -> Vec<
 }
 
 pub fn complete_auth_id(_arg: &str, param: &HashMap<String, String>) -> Vec<String> {
-    pbs_runtime::main(async { complete_auth_id_do(param).await })
+    proxmox_async::runtime::main(async { complete_auth_id_do(param).await })
 }
 
 pub async fn complete_auth_id_do(param: &HashMap<String, String>) -> Vec<String> {
diff --git a/pbs-runtime/Cargo.toml b/pbs-runtime/Cargo.toml
deleted file mode 100644 (file)
index 72c25ac..0000000
+++ /dev/null
@@ -1,11 +0,0 @@
-[package]
-name = "pbs-runtime"
-version = "0.1.0"
-authors = ["Proxmox Support Team <support@proxmox.com>"]
-edition = "2018"
-description = "tokio runtime related helpers required for binaries"
-
-[dependencies]
-lazy_static = "1.4"
-pin-utils = "0.1.0"
-tokio = { version = "1.6", features = [ "rt", "rt-multi-thread" ] }
diff --git a/pbs-runtime/src/lib.rs b/pbs-runtime/src/lib.rs
deleted file mode 100644 (file)
index baa7ded..0000000
+++ /dev/null
@@ -1,203 +0,0 @@
-//! Helpers for quirks of the current tokio runtime.
-
-use std::cell::RefCell;
-use std::future::Future;
-use std::sync::{Arc, Weak, Mutex};
-use std::task::{Context, Poll, RawWaker, Waker};
-use std::thread::{self, Thread};
-
-use lazy_static::lazy_static;
-use pin_utils::pin_mut;
-use tokio::runtime::{self, Runtime};
-
-thread_local! {
-    static BLOCKING: RefCell<bool> = RefCell::new(false);
-}
-
-fn is_in_tokio() -> bool {
-    tokio::runtime::Handle::try_current()
-        .is_ok()
-}
-
-fn is_blocking() -> bool {
-    BLOCKING.with(|v| *v.borrow())
-}
-
-struct BlockingGuard(bool);
-
-impl BlockingGuard {
-    fn set() -> Self {
-        Self(BLOCKING.with(|v| {
-            let old = *v.borrow();
-            *v.borrow_mut() = true;
-            old
-        }))
-    }
-}
-
-impl Drop for BlockingGuard {
-    fn drop(&mut self) {
-        BLOCKING.with(|v| {
-            *v.borrow_mut() = self.0;
-        });
-    }
-}
-
-lazy_static! {
-    // avoid openssl bug: https://github.com/openssl/openssl/issues/6214
-    // by dropping the runtime as early as possible
-    static ref RUNTIME: Mutex<Weak<Runtime>> = Mutex::new(Weak::new());
-}
-
-#[link(name = "crypto")]
-extern "C" {
-    fn OPENSSL_thread_stop();
-}
-
-/// Get or create the current main tokio runtime.
-///
-/// This makes sure that tokio's worker threads are marked for us so that we know whether we
-/// can/need to use `block_in_place` in our `block_on` helper.
-pub fn get_runtime_with_builder<F: Fn() -> runtime::Builder>(get_builder: F) -> Arc<Runtime> {
-
-    let mut guard = RUNTIME.lock().unwrap();
-
-    if let Some(rt) = guard.upgrade() { return rt; }
-
-    let mut builder = get_builder();
-    builder.on_thread_stop(|| {
-        // avoid openssl bug: https://github.com/openssl/openssl/issues/6214
-        // call OPENSSL_thread_stop to avoid race with openssl cleanup handlers
-        unsafe { OPENSSL_thread_stop(); }
-    });
-
-    let runtime = builder.build().expect("failed to spawn tokio runtime");
-    let rt = Arc::new(runtime);
-
-    *guard = Arc::downgrade(&rt);
-
-    rt
-}
-
-/// Get or create the current main tokio runtime.
-///
-/// This calls get_runtime_with_builder() using the tokio default threaded scheduler
-pub fn get_runtime() -> Arc<Runtime> {
-
-    get_runtime_with_builder(|| {
-        let mut builder = runtime::Builder::new_multi_thread();
-        builder.enable_all();
-        builder
-    })
-}
-
-
-/// Block on a synchronous piece of code.
-pub fn block_in_place<R>(fut: impl FnOnce() -> R) -> R {
-    // don't double-exit the context (tokio doesn't like that)
-    // also, if we're not actually in a tokio-worker we must not use block_in_place() either
-    if is_blocking() || !is_in_tokio() {
-        fut()
-    } else {
-        // we are in an actual tokio worker thread, block it:
-        tokio::task::block_in_place(move || {
-            let _guard = BlockingGuard::set();
-            fut()
-        })
-    }
-}
-
-/// Block on a future in this thread.
-pub fn block_on<F: Future>(fut: F) -> F::Output {
-    // don't double-exit the context (tokio doesn't like that)
-    if is_blocking() {
-        block_on_local_future(fut)
-    } else if is_in_tokio() {
-        // inside a tokio worker we need to tell tokio that we're about to really block:
-        tokio::task::block_in_place(move || {
-            let _guard = BlockingGuard::set();
-            block_on_local_future(fut)
-        })
-    } else {
-        // not a worker thread, not associated with a runtime, make sure we have a runtime (spawn
-        // it on demand if necessary), then enter it
-        let _guard = BlockingGuard::set();
-        let _enter_guard = get_runtime().enter();
-        get_runtime().block_on(fut)
-    }
-}
-
-/*
-fn block_on_impl<F>(mut fut: F) -> F::Output
-where
-    F: Future + Send,
-    F::Output: Send + 'static,
-{
-    let (tx, rx) = tokio::sync::oneshot::channel();
-    let fut_ptr = &mut fut as *mut F as usize; // hack to not require F to be 'static
-    tokio::spawn(async move {
-        let fut: F = unsafe { std::ptr::read(fut_ptr as *mut F) };
-        tx
-            .send(fut.await)
-            .map_err(drop)
-            .expect("failed to send block_on result to channel")
-    });
-
-    futures::executor::block_on(async move {
-        rx.await.expect("failed to receive block_on result from channel")
-    })
-    std::mem::forget(fut);
-}
-*/
-
-/// This used to be our tokio main entry point. Now this just calls out to `block_on` for
-/// compatibility, which will perform all the necessary tasks on-demand anyway.
-pub fn main<F: Future>(fut: F) -> F::Output {
-    block_on(fut)
-}
-
-fn block_on_local_future<F: Future>(fut: F) -> F::Output {
-    pin_mut!(fut);
-
-    let waker = Arc::new(thread::current());
-    let waker = thread_waker_clone(Arc::into_raw(waker) as *const ());
-    let waker = unsafe { Waker::from_raw(waker) };
-    let mut context = Context::from_waker(&waker);
-    loop {
-        match fut.as_mut().poll(&mut context) {
-            Poll::Ready(out) => return out,
-            Poll::Pending => thread::park(),
-        }
-    }
-}
-
-const THREAD_WAKER_VTABLE: std::task::RawWakerVTable = std::task::RawWakerVTable::new(
-    thread_waker_clone,
-    thread_waker_wake,
-    thread_waker_wake_by_ref,
-    thread_waker_drop,
-);
-
-fn thread_waker_clone(this: *const ()) -> RawWaker {
-    let this = unsafe { Arc::from_raw(this as *const Thread) };
-    let cloned = Arc::clone(&this);
-    let _ = Arc::into_raw(this);
-
-    RawWaker::new(Arc::into_raw(cloned) as *const (), &THREAD_WAKER_VTABLE)
-}
-
-fn thread_waker_wake(this: *const ()) {
-    let this = unsafe { Arc::from_raw(this as *const Thread) };
-    this.unpark();
-}
-
-fn thread_waker_wake_by_ref(this: *const ()) {
-    let this = unsafe { Arc::from_raw(this as *const Thread) };
-    this.unpark();
-    let _ = Arc::into_raw(this);
-}
-
-fn thread_waker_drop(this: *const ()) {
-    let this = unsafe { Arc::from_raw(this as *const Thread) };
-    drop(this);
-}
index 14f61f48993b5f7f19fd7770ff582f2282c30c4c..d0c6b3ff90e61f9b28ca3b9f64426cfe3b043ae2 100644 (file)
@@ -33,13 +33,13 @@ walkdir = "2"
 zstd = { version = "0.6", features = [ "bindgen" ] }
 
 proxmox = { version = "0.15.3", default-features = false, features = [ "tokio" ] }
+proxmox-async = "0.1"
 proxmox-borrow = "1"
 proxmox-io = { version = "1", features = [ "tokio" ] }
 proxmox-lang = { version = "1" }
 proxmox-time = { version = "1" }
 
 pbs-buildcfg = { path = "../pbs-buildcfg" }
-pbs-runtime = { path = "../pbs-runtime" }
 
 [dev-dependencies]
 tokio = { version = "1.6", features = [ "macros" ] }
index 315a786248e1c01c75403457959064556fa6108a..be4ea891ac72e8b92d322e05277c8b113f90e041 100644 (file)
@@ -7,7 +7,7 @@ use std::collections::HashMap;
 use std::future::Future;
 use std::sync::{Arc, Mutex};
 
-use crate::broadcast_future::BroadcastFuture;
+use proxmox_async::broadcast_future::BroadcastFuture;
 use crate::lru_cache::LruCache;
 
 /// Interface for asynchronously getting values on cache misses.
diff --git a/pbs-tools/src/blocking.rs b/pbs-tools/src/blocking.rs
deleted file mode 100644 (file)
index f5828df..0000000
+++ /dev/null
@@ -1,99 +0,0 @@
-//! Async wrappers for blocking I/O (adding `block_in_place` around channels/readers)
-
-use std::io::{self, Read};
-use std::pin::Pin;
-use std::task::{Context, Poll};
-use std::sync::mpsc::Receiver;
-
-use futures::stream::Stream;
-
-use pbs_runtime::block_in_place;
-
-/// Wrapper struct to convert a Reader into a Stream
-pub struct WrappedReaderStream<R: Read + Unpin> {
-    reader: R,
-    buffer: Vec<u8>,
-}
-
-impl <R: Read + Unpin> WrappedReaderStream<R> {
-
-    pub fn new(reader: R) -> Self {
-        let mut buffer = Vec::with_capacity(64*1024);
-        unsafe { buffer.set_len(buffer.capacity()); }
-        Self { reader, buffer }
-    }
-}
-
-impl<R: Read + Unpin> Stream for WrappedReaderStream<R> {
-    type Item = Result<Vec<u8>, io::Error>;
-
-    fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Item>> {
-        let this = self.get_mut();
-        match block_in_place(|| this.reader.read(&mut this.buffer)) {
-            Ok(n) => {
-                if n == 0 {
-                    // EOF
-                    Poll::Ready(None)
-                } else {
-                    Poll::Ready(Some(Ok(this.buffer[..n].to_vec())))
-                }
-            }
-            Err(err) => Poll::Ready(Some(Err(err))),
-        }
-    }
-}
-
-/// Wrapper struct to convert a channel Receiver into a Stream
-pub struct StdChannelStream<T>(pub Receiver<T>);
-
-impl<T> Stream for StdChannelStream<T> {
-    type Item = T;
-
-    fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Item>> {
-        match block_in_place(|| self.0.recv()) {
-            Ok(data) => Poll::Ready(Some(data)),
-            Err(_) => Poll::Ready(None),// channel closed
-        }
-    }
-}
-
-#[cfg(test)]
-mod test {
-    use std::io;
-
-    use anyhow::Error;
-    use futures::stream::TryStreamExt;
-
-    #[test]
-    fn test_wrapped_stream_reader() -> Result<(), Error> {
-        pbs_runtime::main(async {
-            run_wrapped_stream_reader_test().await
-        })
-    }
-
-    struct DummyReader(usize);
-
-    impl io::Read for DummyReader {
-        fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
-            self.0 += 1;
-
-            if self.0 >= 10 {
-                return Ok(0);
-            }
-
-            unsafe {
-                std::ptr::write_bytes(buf.as_mut_ptr(), 0, buf.len());
-            }
-
-            Ok(buf.len())
-        }
-    }
-
-    async fn run_wrapped_stream_reader_test() -> Result<(), Error> {
-        let mut reader = super::WrappedReaderStream::new(DummyReader(0));
-        while let Some(_data) = reader.try_next().await? {
-            // just waiting
-        }
-        Ok(())
-    }
-}
diff --git a/pbs-tools/src/broadcast_future.rs b/pbs-tools/src/broadcast_future.rs
deleted file mode 100644 (file)
index 7bfd83b..0000000
+++ /dev/null
@@ -1,180 +0,0 @@
-use std::future::Future;
-use std::pin::Pin;
-use std::sync::{Arc, Mutex};
-
-use anyhow::{format_err, Error};
-use futures::future::{FutureExt, TryFutureExt};
-use tokio::sync::oneshot;
-
-/// Broadcast results to registered listeners using asnyc oneshot channels
-#[derive(Default)]
-pub struct BroadcastData<T> {
-    result: Option<Result<T, String>>,
-    listeners: Vec<oneshot::Sender<Result<T, Error>>>,
-}
-
-impl <T: Clone> BroadcastData<T> {
-
-    pub fn new() -> Self {
-        Self {
-            result: None,
-            listeners: vec![],
-        }
-    }
-
-    pub fn notify_listeners(&mut self, result: Result<T, String>) {
-
-        self.result = Some(result.clone());
-
-        loop {
-            match self.listeners.pop() {
-                None => { break; },
-                Some(ch) => {
-                    match &result {
-                        Ok(result) => { let _ = ch.send(Ok(result.clone())); },
-                        Err(err) => { let _ = ch.send(Err(format_err!("{}", err))); },
-                    }
-                },
-            }
-        }
-    }
-
-    pub fn listen(&mut self) -> impl Future<Output = Result<T, Error>> {
-        use futures::future::{ok, Either};
-
-        match &self.result {
-            None => {},
-            Some(Ok(result)) => return Either::Left(ok(result.clone())),
-            Some(Err(err)) => return Either::Left(futures::future::err(format_err!("{}", err))),
-        }
-
-        let (tx, rx) = oneshot::channel::<Result<T, Error>>();
-
-        self.listeners.push(tx);
-
-        Either::Right(rx
-            .map(|res| match res {
-                Ok(Ok(t)) => Ok(t),
-                Ok(Err(e)) => Err(e),
-                Err(e) => Err(Error::from(e)),
-            })
-        )
-    }
-}
-
-type SourceFuture<T> = Pin<Box<dyn Future<Output = Result<T, Error>> + Send>>;
-
-struct BroadCastFutureBinding<T> {
-    broadcast: BroadcastData<T>,
-    future: Option<SourceFuture<T>>,
-}
-
-/// Broadcast future results to registered listeners
-pub struct BroadcastFuture<T> {
-    inner: Arc<Mutex<BroadCastFutureBinding<T>>>,
-}
-
-impl<T: Clone + Send + 'static> BroadcastFuture<T> {
-    /// Create instance for specified source future.
-    ///
-    /// The result of the future is sent to all registered listeners.
-    pub fn new(source: Box<dyn Future<Output = Result<T, Error>> + Send>) -> Self {
-        let inner = BroadCastFutureBinding {
-            broadcast: BroadcastData::new(),
-            future: Some(Pin::from(source)),
-        };
-        Self { inner: Arc::new(Mutex::new(inner)) }
-    }
-
-    /// Creates a new instance with a oneshot channel as trigger
-    pub fn new_oneshot() -> (Self, oneshot::Sender<Result<T, Error>>) {
-
-        let (tx, rx) = oneshot::channel::<Result<T, Error>>();
-        let rx = rx
-            .map_err(Error::from)
-            .and_then(futures::future::ready);
-
-        (Self::new(Box::new(rx)), tx)
-    }
-
-    fn notify_listeners(
-        inner: Arc<Mutex<BroadCastFutureBinding<T>>>,
-        result: Result<T, String>,
-    ) {
-        let mut data = inner.lock().unwrap();
-        data.broadcast.notify_listeners(result);
-    }
-
-    fn spawn(inner: Arc<Mutex<BroadCastFutureBinding<T>>>) -> impl Future<Output = Result<T, Error>> {
-        let mut data = inner.lock().unwrap();
-
-        if let Some(source) = data.future.take() {
-
-            let inner1 = inner.clone();
-
-            let task = source.map(move |value| {
-                match value {
-                    Ok(value) => Self::notify_listeners(inner1, Ok(value)),
-                    Err(err) => Self::notify_listeners(inner1, Err(err.to_string())),
-                }
-            });
-            tokio::spawn(task);
-        }
-
-        data.broadcast.listen()
-    }
-
-    /// Register a listener
-    pub fn listen(&self) -> impl Future<Output = Result<T, Error>> {
-        let inner2 = self.inner.clone();
-        async move { Self::spawn(inner2).await }
-    }
-}
-
-#[test]
-fn test_broadcast_future() {
-    use std::sync::atomic::{AtomicUsize, Ordering};
-
-    static CHECKSUM: AtomicUsize  = AtomicUsize::new(0);
-
-    let (sender, trigger) = BroadcastFuture::new_oneshot();
-
-    let receiver1 = sender.listen()
-        .map_ok(|res| {
-            CHECKSUM.fetch_add(res, Ordering::SeqCst);
-        })
-        .map_err(|err| { panic!("got error {}", err); })
-        .map(|_| ());
-
-    let receiver2 = sender.listen()
-        .map_ok(|res| {
-            CHECKSUM.fetch_add(res*2, Ordering::SeqCst);
-        })
-        .map_err(|err| { panic!("got error {}", err); })
-        .map(|_| ());
-
-    let rt = tokio::runtime::Runtime::new().unwrap();
-    rt.block_on(async move {
-        let r1 = tokio::spawn(receiver1);
-        let r2 = tokio::spawn(receiver2);
-
-        trigger.send(Ok(1)).unwrap();
-        let _ = r1.await;
-        let _ = r2.await;
-    });
-
-    let result = CHECKSUM.load(Ordering::SeqCst);
-
-    assert_eq!(result, 3);
-
-    // the result stays available until the BroadcastFuture is dropped
-    rt.block_on(sender.listen()
-        .map_ok(|res| {
-            CHECKSUM.fetch_add(res*4, Ordering::SeqCst);
-        })
-        .map_err(|err| { panic!("got error {}", err); })
-        .map(|_| ()));
-
-    let result = CHECKSUM.load(Ordering::SeqCst);
-    assert_eq!(result, 7);
-}
diff --git a/pbs-tools/src/compression.rs b/pbs-tools/src/compression.rs
deleted file mode 100644 (file)
index aa1b0b2..0000000
+++ /dev/null
@@ -1,194 +0,0 @@
-use std::io;
-use std::pin::Pin;
-use std::task::{Context, Poll};
-
-use anyhow::Error;
-use bytes::Bytes;
-use flate2::{Compress, Compression, FlushCompress};
-use futures::ready;
-use futures::stream::Stream;
-use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
-
-use proxmox::io_format_err;
-use proxmox_io::ByteBuffer;
-
-const BUFFER_SIZE: usize = 8192;
-
-pub enum Level {
-    Fastest,
-    Best,
-    Default,
-    Precise(u32),
-}
-
-#[derive(Eq, PartialEq)]
-enum EncoderState {
-    Reading,
-    Writing,
-    Flushing,
-    Finished,
-}
-
-pub struct DeflateEncoder<T> {
-    inner: T,
-    compressor: Compress,
-    buffer: ByteBuffer,
-    input_buffer: Bytes,
-    state: EncoderState,
-}
-
-impl<T> DeflateEncoder<T> {
-    pub fn new(inner: T) -> Self {
-        Self::with_quality(inner, Level::Default)
-    }
-
-    pub fn with_quality(inner: T, level: Level) -> Self {
-        let level = match level {
-            Level::Fastest => Compression::fast(),
-            Level::Best => Compression::best(),
-            Level::Default => Compression::new(3),
-            Level::Precise(val) => Compression::new(val),
-        };
-
-        Self {
-            inner,
-            compressor: Compress::new(level, false),
-            buffer: ByteBuffer::with_capacity(BUFFER_SIZE),
-            input_buffer: Bytes::new(),
-            state: EncoderState::Reading,
-        }
-    }
-
-    pub fn total_in(&self) -> u64 {
-        self.compressor.total_in()
-    }
-
-    pub fn total_out(&self) -> u64 {
-        self.compressor.total_out()
-    }
-
-    pub fn into_inner(self) -> T {
-        self.inner
-    }
-
-    fn encode(
-        &mut self,
-        inbuf: &[u8],
-        flush: FlushCompress,
-    ) -> Result<(usize, flate2::Status), io::Error> {
-        let old_in = self.compressor.total_in();
-        let old_out = self.compressor.total_out();
-        let res = self
-            .compressor
-            .compress(&inbuf[..], self.buffer.get_free_mut_slice(), flush)?;
-        let new_in = (self.compressor.total_in() - old_in) as usize;
-        let new_out = (self.compressor.total_out() - old_out) as usize;
-        self.buffer.add_size(new_out);
-
-        Ok((new_in, res))
-    }
-}
-
-impl DeflateEncoder<Vec<u8>> {
-    // assume small files
-    pub async fn compress_vec<R>(&mut self, reader: &mut R, size_hint: usize) -> Result<(), Error>
-    where
-        R: AsyncRead + Unpin,
-    {
-        let mut buffer = Vec::with_capacity(size_hint);
-        reader.read_to_end(&mut buffer).await?;
-        self.inner.reserve(size_hint); // should be enough since we want smalller files
-        self.compressor.compress_vec(&buffer[..], &mut self.inner, FlushCompress::Finish)?;
-        Ok(())
-    }
-}
-
-impl<T: AsyncWrite + Unpin> DeflateEncoder<T> {
-    pub async fn compress<R>(&mut self, reader: &mut R) -> Result<(), Error>
-    where
-        R: AsyncRead + Unpin,
-    {
-        let mut buffer = ByteBuffer::with_capacity(BUFFER_SIZE);
-        let mut eof = false;
-        loop {
-            if !eof && !buffer.is_full() {
-                let read = buffer.read_from_async(reader).await?;
-                if read == 0 {
-                    eof = true;
-                }
-            }
-            let (read, _res) = self.encode(&buffer[..], FlushCompress::None)?;
-            buffer.consume(read);
-
-            self.inner.write_all(&self.buffer[..]).await?;
-            self.buffer.clear();
-
-            if buffer.is_empty() && eof {
-                break;
-            }
-        }
-
-        loop {
-            let (_read, res) = self.encode(&[][..], FlushCompress::Finish)?;
-            self.inner.write_all(&self.buffer[..]).await?;
-            self.buffer.clear();
-            if res == flate2::Status::StreamEnd {
-                break;
-            }
-        }
-
-        Ok(())
-    }
-}
-
-impl<T, O> Stream for DeflateEncoder<T>
-where
-    T: Stream<Item = Result<O, io::Error>> + Unpin,
-    O: Into<Bytes>
-{
-    type Item = Result<Bytes, io::Error>;
-
-    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
-        let this = self.get_mut();
-
-        loop {
-            match this.state {
-                EncoderState::Reading => {
-                    if let Some(res) = ready!(Pin::new(&mut this.inner).poll_next(cx)) {
-                        let buf = res?;
-                        this.input_buffer = buf.into();
-                        this.state = EncoderState::Writing;
-                    } else {
-                        this.state = EncoderState::Flushing;
-                    }
-                }
-                EncoderState::Writing => {
-                    if this.input_buffer.is_empty() {
-                        return Poll::Ready(Some(Err(io_format_err!("empty input during write"))));
-                    }
-                    let mut buf = this.input_buffer.split_off(0);
-                    let (read, res) = this.encode(&buf[..], FlushCompress::None)?;
-                    this.input_buffer = buf.split_off(read);
-                    if this.input_buffer.is_empty() {
-                        this.state = EncoderState::Reading;
-                    }
-                    if this.buffer.is_full() || res == flate2::Status::BufError {
-                        let bytes = this.buffer.remove_data(this.buffer.len()).to_vec();
-                        return Poll::Ready(Some(Ok(bytes.into())));
-                    }
-                }
-                EncoderState::Flushing => {
-                    let (_read, res) = this.encode(&[][..], FlushCompress::Finish)?;
-                    if !this.buffer.is_empty() {
-                        let bytes = this.buffer.remove_data(this.buffer.len()).to_vec();
-                        return Poll::Ready(Some(Ok(bytes.into())));
-                    }
-                    if res == flate2::Status::StreamEnd {
-                        this.state = EncoderState::Finished;
-                    }
-                }
-                EncoderState::Finished => return Poll::Ready(None),
-            }
-        }
-    }
-}
index 3b47010961beea373f8b1ea4597515f73ce6488a..9ea4ea5c940e258b3b5b5011538ebea012bcd119 100644 (file)
@@ -1,9 +1,6 @@
 pub mod acl;
-pub mod blocking;
-pub mod broadcast_future;
 pub mod cert;
 pub mod cli;
-pub mod compression;
 pub mod crypt_config;
 pub mod format;
 pub mod fs;
@@ -14,13 +11,10 @@ pub mod nom;
 pub mod percent_encoding;
 pub mod sha;
 pub mod str;
-pub mod stream;
 pub mod sync;
 pub mod sys;
 pub mod ticket;
-pub mod tokio;
 pub mod xattr;
-pub mod zip;
 
 pub mod async_lru_cache;
 
diff --git a/pbs-tools/src/stream.rs b/pbs-tools/src/stream.rs
deleted file mode 100644 (file)
index cfee01c..0000000
+++ /dev/null
@@ -1,229 +0,0 @@
-//! Wrappers between async readers and streams.
-
-use std::io::{self, Read};
-use std::future::Future;
-use std::pin::Pin;
-use std::task::{Context, Poll};
-
-use anyhow::{Error, Result};
-use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
-use tokio::sync::mpsc::Sender;
-use futures::ready;
-use futures::future::FutureExt;
-use futures::stream::Stream;
-
-use proxmox::io_format_err;
-use proxmox::sys::error::io_err_other;
-use proxmox_io::ByteBuffer;
-
-use pbs_runtime::block_in_place;
-
-/// Wrapper struct to convert a Reader into a Stream
-pub struct WrappedReaderStream<R: Read + Unpin> {
-    reader: R,
-    buffer: Vec<u8>,
-}
-
-impl <R: Read + Unpin> WrappedReaderStream<R> {
-
-    pub fn new(reader: R) -> Self {
-        let mut buffer = Vec::with_capacity(64*1024);
-        unsafe { buffer.set_len(buffer.capacity()); }
-        Self { reader, buffer }
-    }
-}
-
-impl<R: Read + Unpin> Stream for WrappedReaderStream<R> {
-    type Item = Result<Vec<u8>, io::Error>;
-
-    fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Item>> {
-        let this = self.get_mut();
-        match block_in_place(|| this.reader.read(&mut this.buffer)) {
-            Ok(n) => {
-                if n == 0 {
-                    // EOF
-                    Poll::Ready(None)
-                } else {
-                    Poll::Ready(Some(Ok(this.buffer[..n].to_vec())))
-                }
-            }
-            Err(err) => Poll::Ready(Some(Err(err))),
-        }
-    }
-}
-
-/// Wrapper struct to convert an AsyncReader into a Stream
-pub struct AsyncReaderStream<R: AsyncRead + Unpin> {
-    reader: R,
-    buffer: Vec<u8>,
-}
-
-impl <R: AsyncRead + Unpin> AsyncReaderStream<R> {
-
-    pub fn new(reader: R) -> Self {
-        let mut buffer = Vec::with_capacity(64*1024);
-        unsafe { buffer.set_len(buffer.capacity()); }
-        Self { reader, buffer }
-    }
-
-    pub fn with_buffer_size(reader: R, buffer_size: usize) -> Self {
-        let mut buffer = Vec::with_capacity(buffer_size);
-        unsafe { buffer.set_len(buffer.capacity()); }
-        Self { reader, buffer }
-    }
-}
-
-impl<R: AsyncRead + Unpin> Stream for AsyncReaderStream<R> {
-    type Item = Result<Vec<u8>, io::Error>;
-
-    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
-        let this = self.get_mut();
-        let mut read_buf = ReadBuf::new(&mut this.buffer);
-        match ready!(Pin::new(&mut this.reader).poll_read(cx, &mut read_buf)) {
-            Ok(()) => {
-                let n = read_buf.filled().len();
-                if n == 0 {
-                    // EOF
-                    Poll::Ready(None)
-                } else {
-                    Poll::Ready(Some(Ok(this.buffer[..n].to_vec())))
-                }
-            }
-            Err(err) => Poll::Ready(Some(Err(err))),
-        }
-    }
-}
-
-#[cfg(test)]
-mod test {
-    use std::io;
-
-    use anyhow::Error;
-    use futures::stream::TryStreamExt;
-
-    #[test]
-    fn test_wrapped_stream_reader() -> Result<(), Error> {
-        pbs_runtime::main(async {
-            run_wrapped_stream_reader_test().await
-        })
-    }
-
-    struct DummyReader(usize);
-
-    impl io::Read for DummyReader {
-        fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
-            self.0 += 1;
-
-            if self.0 >= 10 {
-                return Ok(0);
-            }
-
-            unsafe {
-                std::ptr::write_bytes(buf.as_mut_ptr(), 0, buf.len());
-            }
-
-            Ok(buf.len())
-        }
-    }
-
-    async fn run_wrapped_stream_reader_test() -> Result<(), Error> {
-        let mut reader = super::WrappedReaderStream::new(DummyReader(0));
-        while let Some(_data) = reader.try_next().await? {
-            // just waiting
-        }
-        Ok(())
-    }
-}
-
-/// Wrapper around tokio::sync::mpsc::Sender, which implements Write
-pub struct AsyncChannelWriter {
-    sender: Option<Sender<Result<Vec<u8>, Error>>>,
-    buf: ByteBuffer,
-    state: WriterState,
-}
-
-type SendResult = io::Result<Sender<Result<Vec<u8>>>>;
-
-enum WriterState {
-    Ready,
-    Sending(Pin<Box<dyn Future<Output = SendResult> + Send + 'static>>),
-}
-
-impl AsyncChannelWriter {
-    pub fn new(sender: Sender<Result<Vec<u8>, Error>>, buf_size: usize) -> Self {
-        Self {
-            sender: Some(sender),
-            buf: ByteBuffer::with_capacity(buf_size),
-            state: WriterState::Ready,
-        }
-    }
-
-    fn poll_write_impl(
-        &mut self,
-        cx: &mut Context,
-        buf: &[u8],
-        flush: bool,
-    ) -> Poll<io::Result<usize>> {
-        loop {
-            match &mut self.state {
-                WriterState::Ready => {
-                    if flush {
-                        if self.buf.is_empty() {
-                            return Poll::Ready(Ok(0));
-                        }
-                    } else {
-                        let free_size = self.buf.free_size();
-                        if free_size > buf.len() || self.buf.is_empty() {
-                            let count = free_size.min(buf.len());
-                            self.buf.get_free_mut_slice()[..count].copy_from_slice(&buf[..count]);
-                            self.buf.add_size(count);
-                            return Poll::Ready(Ok(count));
-                        }
-                    }
-
-                    let sender = match self.sender.take() {
-                        Some(sender) => sender,
-                        None => return Poll::Ready(Err(io_err_other("no sender"))),
-                    };
-
-                    let data = self.buf.remove_data(self.buf.len()).to_vec();
-                    let future = async move {
-                        sender
-                            .send(Ok(data))
-                            .await
-                            .map(move |_| sender)
-                            .map_err(|err| io_format_err!("could not send: {}", err))
-                    };
-
-                    self.state = WriterState::Sending(future.boxed());
-                }
-                WriterState::Sending(ref mut future) => match ready!(future.as_mut().poll(cx)) {
-                    Ok(sender) => {
-                        self.sender = Some(sender);
-                        self.state = WriterState::Ready;
-                    }
-                    Err(err) => return Poll::Ready(Err(err)),
-                },
-            }
-        }
-    }
-}
-
-impl AsyncWrite for AsyncChannelWriter {
-    fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
-        let this = self.get_mut();
-        this.poll_write_impl(cx, buf, false)
-    }
-
-    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
-        let this = self.get_mut();
-        match ready!(this.poll_write_impl(cx, &[], true)) {
-            Ok(_) => Poll::Ready(Ok(())),
-            Err(err) => Poll::Ready(Err(err)),
-        }
-    }
-
-    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
-        self.poll_flush(cx)
-    }
-}
diff --git a/pbs-tools/src/tokio/mod.rs b/pbs-tools/src/tokio/mod.rs
deleted file mode 100644 (file)
index 43fc107..0000000
+++ /dev/null
@@ -1,2 +0,0 @@
-pub mod tokio_writer_adapter;
-pub use tokio_writer_adapter::TokioWriterAdapter;
diff --git a/pbs-tools/src/tokio/tokio_writer_adapter.rs b/pbs-tools/src/tokio/tokio_writer_adapter.rs
deleted file mode 100644 (file)
index 7b7f5dc..0000000
+++ /dev/null
@@ -1,26 +0,0 @@
-use std::io::Write;
-
-use tokio::task::block_in_place;
-
-/// Wrapper around a writer which implements Write
-///
-/// wraps each write with a 'block_in_place' so that
-/// any (blocking) writer can be safely used in async context in a
-/// tokio runtime
-pub struct TokioWriterAdapter<W: Write>(W);
-
-impl<W: Write> TokioWriterAdapter<W> {
-    pub fn new(writer: W) -> Self {
-        Self(writer)
-    }
-}
-
-impl<W: Write> Write for TokioWriterAdapter<W> {
-    fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
-        block_in_place(|| self.0.write(buf))
-    }
-
-    fn flush(&mut self) -> Result<(), std::io::Error> {
-        block_in_place(|| self.0.flush())
-    }
-}
diff --git a/pbs-tools/src/zip.rs b/pbs-tools/src/zip.rs
deleted file mode 100644 (file)
index 0e85dd2..0000000
+++ /dev/null
@@ -1,672 +0,0 @@
-//! ZIP Helper
-//!
-//! Provides an interface to create a ZIP File from ZipEntries
-//! for a more detailed description of the ZIP format, see:
-//! https://pkware.cachefly.net/webdocs/casestudies/APPNOTE.TXT
-
-use std::convert::TryInto;
-use std::ffi::OsString;
-use std::io;
-use std::mem::size_of;
-use std::os::unix::ffi::OsStrExt;
-use std::path::{Component, Path, PathBuf};
-use std::pin::Pin;
-use std::task::{Context, Poll};
-use std::time::SystemTime;
-
-use anyhow::{format_err, Error, Result};
-use endian_trait::Endian;
-use futures::ready;
-use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, ReadBuf};
-
-use crc32fast::Hasher;
-use proxmox_time::gmtime;
-
-use crate::compression::{DeflateEncoder, Level};
-
-const LOCAL_FH_SIG: u32 = 0x04034B50;
-const LOCAL_FF_SIG: u32 = 0x08074B50;
-const CENTRAL_DIRECTORY_FH_SIG: u32 = 0x02014B50;
-const END_OF_CENTRAL_DIR: u32 = 0x06054B50;
-const VERSION_NEEDED: u16 = 0x002d;
-const VERSION_MADE_BY: u16 = 0x032d;
-
-const ZIP64_EOCD_RECORD: u32 = 0x06064B50;
-const ZIP64_EOCD_LOCATOR: u32 = 0x07064B50;
-
-// bits for time:
-// 0-4: day of the month (1-31)
-// 5-8: month: (1 = jan, etc.)
-// 9-15: year offset from 1980
-//
-// bits for date:
-// 0-4: second / 2
-// 5-10: minute (0-59)
-// 11-15: hour (0-23)
-//
-// see https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-filetimetodosdatetime
-fn epoch_to_dos(epoch: i64) -> (u16, u16) {
-    let gmtime = match gmtime(epoch) {
-        Ok(gmtime) => gmtime,
-        Err(_) => return (0, 0),
-    };
-
-    let seconds = (gmtime.tm_sec / 2) & 0b11111;
-    let minutes = gmtime.tm_min & 0xb111111;
-    let hours = gmtime.tm_hour & 0b11111;
-    let time: u16 = ((hours << 11) | (minutes << 5) | (seconds)) as u16;
-
-    let date: u16 = if gmtime.tm_year > (2108 - 1900) || gmtime.tm_year < (1980 - 1900) {
-        0
-    } else {
-        let day = gmtime.tm_mday & 0b11111;
-        let month = (gmtime.tm_mon + 1) & 0b1111;
-        let year = (gmtime.tm_year + 1900 - 1980) & 0b1111111;
-        ((year << 9) | (month << 5) | (day)) as u16
-    };
-
-    (date, time)
-}
-
-#[derive(Endian)]
-#[repr(C, packed)]
-struct Zip64Field {
-    field_type: u16,
-    field_size: u16,
-    uncompressed_size: u64,
-    compressed_size: u64,
-}
-
-#[derive(Endian)]
-#[repr(C, packed)]
-struct Zip64FieldWithOffset {
-    field_type: u16,
-    field_size: u16,
-    uncompressed_size: u64,
-    compressed_size: u64,
-    offset: u64,
-    start_disk: u32,
-}
-
-#[derive(Endian)]
-#[repr(C, packed)]
-struct LocalFileHeader {
-    signature: u32,
-    version_needed: u16,
-    flags: u16,
-    compression: u16,
-    time: u16,
-    date: u16,
-    crc32: u32,
-    compressed_size: u32,
-    uncompressed_size: u32,
-    filename_len: u16,
-    extra_field_len: u16,
-}
-
-#[derive(Endian)]
-#[repr(C, packed)]
-struct LocalFileFooter {
-    signature: u32,
-    crc32: u32,
-    compressed_size: u64,
-    uncompressed_size: u64,
-}
-
-#[derive(Endian)]
-#[repr(C, packed)]
-struct CentralDirectoryFileHeader {
-    signature: u32,
-    version_made_by: u16,
-    version_needed: u16,
-    flags: u16,
-    compression: u16,
-    time: u16,
-    date: u16,
-    crc32: u32,
-    compressed_size: u32,
-    uncompressed_size: u32,
-    filename_len: u16,
-    extra_field_len: u16,
-    comment_len: u16,
-    start_disk: u16,
-    internal_flags: u16,
-    external_flags: u32,
-    offset: u32,
-}
-
-#[derive(Endian)]
-#[repr(C, packed)]
-struct EndOfCentralDir {
-    signature: u32,
-    disk_number: u16,
-    start_disk: u16,
-    disk_record_count: u16,
-    total_record_count: u16,
-    directory_size: u32,
-    directory_offset: u32,
-    comment_len: u16,
-}
-
-#[derive(Endian)]
-#[repr(C, packed)]
-struct Zip64EOCDRecord {
-    signature: u32,
-    field_size: u64,
-    version_made_by: u16,
-    version_needed: u16,
-    disk_number: u32,
-    disk_number_central_dir: u32,
-    disk_record_count: u64,
-    total_record_count: u64,
-    directory_size: u64,
-    directory_offset: u64,
-}
-
-#[derive(Endian)]
-#[repr(C, packed)]
-struct Zip64EOCDLocator {
-    signature: u32,
-    disk_number: u32,
-    offset: u64,
-    disk_count: u32,
-}
-
-async fn write_struct<E, T>(output: &mut T, data: E) -> io::Result<()>
-where
-    T: AsyncWrite + ?Sized + Unpin,
-    E: Endian,
-{
-    let data = data.to_le();
-
-    let data = unsafe {
-        std::slice::from_raw_parts(
-            &data as *const E as *const u8,
-            core::mem::size_of_val(&data),
-        )
-    };
-    output.write_all(data).await
-}
-
-/// Represents an Entry in a ZIP File
-///
-/// used to add to a ZipEncoder
-pub struct ZipEntry {
-    filename: OsString,
-    mtime: i64,
-    mode: u16,
-    crc32: u32,
-    uncompressed_size: u64,
-    compressed_size: u64,
-    offset: u64,
-    is_file: bool,
-}
-
-impl ZipEntry {
-    /// Creates a new ZipEntry
-    ///
-    /// if is_file is false the path will contain an trailing separator,
-    /// so that the zip file understands that it is a directory
-    pub fn new<P: AsRef<Path>>(path: P, mtime: i64, mode: u16, is_file: bool) -> Self {
-        let mut relpath = PathBuf::new();
-
-        for comp in path.as_ref().components() {
-            if let Component::Normal(_) = comp {
-                relpath.push(comp);
-            }
-        }
-
-        if !is_file {
-            relpath.push(""); // adds trailing slash
-        }
-
-        Self {
-            filename: relpath.into(),
-            crc32: 0,
-            mtime,
-            mode,
-            uncompressed_size: 0,
-            compressed_size: 0,
-            offset: 0,
-            is_file,
-        }
-    }
-
-    async fn write_local_header<W>(&self, mut buf: &mut W) -> io::Result<usize>
-    where
-        W: AsyncWrite + Unpin + ?Sized,
-    {
-        let filename = self.filename.as_bytes();
-        let filename_len = filename.len();
-        let header_size = size_of::<LocalFileHeader>();
-        let zip_field_size = size_of::<Zip64Field>();
-        let size: usize = header_size + filename_len + zip_field_size;
-
-        let (date, time) = epoch_to_dos(self.mtime);
-
-        write_struct(
-            &mut buf,
-            LocalFileHeader {
-                signature: LOCAL_FH_SIG,
-                version_needed: 0x2d,
-                flags: 1 << 3,
-                compression: 0x8,
-                time,
-                date,
-                crc32: 0,
-                compressed_size: 0xFFFFFFFF,
-                uncompressed_size: 0xFFFFFFFF,
-                filename_len: filename_len as u16,
-                extra_field_len: zip_field_size as u16,
-            },
-        )
-        .await?;
-
-        buf.write_all(filename).await?;
-
-        write_struct(
-            &mut buf,
-            Zip64Field {
-                field_type: 0x0001,
-                field_size: 2 * 8,
-                uncompressed_size: 0,
-                compressed_size: 0,
-            },
-        )
-        .await?;
-
-        Ok(size)
-    }
-
-    async fn write_data_descriptor<W: AsyncWrite + Unpin + ?Sized>(
-        &self,
-        mut buf: &mut W,
-    ) -> io::Result<usize> {
-        let size = size_of::<LocalFileFooter>();
-
-        write_struct(
-            &mut buf,
-            LocalFileFooter {
-                signature: LOCAL_FF_SIG,
-                crc32: self.crc32,
-                compressed_size: self.compressed_size,
-                uncompressed_size: self.uncompressed_size,
-            },
-        )
-        .await?;
-
-        Ok(size)
-    }
-
-    async fn write_central_directory_header<W: AsyncWrite + Unpin + ?Sized>(
-        &self,
-        mut buf: &mut W,
-    ) -> io::Result<usize> {
-        let filename = self.filename.as_bytes();
-        let filename_len = filename.len();
-        let header_size = size_of::<CentralDirectoryFileHeader>();
-        let zip_field_size = size_of::<Zip64FieldWithOffset>();
-        let mut size: usize = header_size + filename_len;
-
-        let (date, time) = epoch_to_dos(self.mtime);
-
-        let (compressed_size, uncompressed_size, offset, need_zip64) = if self.compressed_size
-            >= (u32::MAX as u64)
-            || self.uncompressed_size >= (u32::MAX as u64)
-            || self.offset >= (u32::MAX as u64)
-        {
-            size += zip_field_size;
-            (0xFFFFFFFF, 0xFFFFFFFF, 0xFFFFFFFF, true)
-        } else {
-            (
-                self.compressed_size as u32,
-                self.uncompressed_size as u32,
-                self.offset as u32,
-                false,
-            )
-        };
-
-        write_struct(
-            &mut buf,
-            CentralDirectoryFileHeader {
-                signature: CENTRAL_DIRECTORY_FH_SIG,
-                version_made_by: VERSION_MADE_BY,
-                version_needed: VERSION_NEEDED,
-                flags: 1 << 3,
-                compression: 0x8,
-                time,
-                date,
-                crc32: self.crc32,
-                compressed_size,
-                uncompressed_size,
-                filename_len: filename_len as u16,
-                extra_field_len: if need_zip64 { zip_field_size as u16 } else { 0 },
-                comment_len: 0,
-                start_disk: 0,
-                internal_flags: 0,
-                external_flags: (self.mode as u32) << 16 | (!self.is_file as u32) << 4,
-                offset,
-            },
-        )
-        .await?;
-
-        buf.write_all(filename).await?;
-
-        if need_zip64 {
-            write_struct(
-                &mut buf,
-                Zip64FieldWithOffset {
-                    field_type: 1,
-                    field_size: 3 * 8 + 4,
-                    uncompressed_size: self.uncompressed_size,
-                    compressed_size: self.compressed_size,
-                    offset: self.offset,
-                    start_disk: 0,
-                },
-            )
-            .await?;
-        }
-
-        Ok(size)
-    }
-}
-
-// wraps an asyncreader and calculates the hash
-struct HashWrapper<R> {
-    inner: R,
-    hasher: Hasher,
-}
-
-impl<R> HashWrapper<R> {
-    fn new(inner: R) -> Self {
-        Self {
-            inner,
-            hasher: Hasher::new(),
-        }
-    }
-
-    // consumes self and returns the hash and the reader
-    fn finish(self) -> (u32, R) {
-        let crc32 = self.hasher.finalize();
-        (crc32, self.inner)
-    }
-}
-
-impl<R> AsyncRead for HashWrapper<R>
-where
-    R: AsyncRead + Unpin,
-{
-    fn poll_read(
-        self: Pin<&mut Self>,
-        cx: &mut Context<'_>,
-        buf: &mut ReadBuf<'_>,
-    ) -> Poll<Result<(), io::Error>> {
-        let this = self.get_mut();
-        let old_len = buf.filled().len();
-        ready!(Pin::new(&mut this.inner).poll_read(cx, buf))?;
-        let new_len = buf.filled().len();
-        if new_len > old_len {
-            this.hasher.update(&buf.filled()[old_len..new_len]);
-        }
-        Poll::Ready(Ok(()))
-    }
-}
-
-/// Wraps a writer that implements AsyncWrite for creating a ZIP archive
-///
-/// This will create a ZIP archive on the fly with files added with
-/// 'add_entry'. To Finish the file, call 'finish'
-/// Example:
-/// ```no_run
-/// use anyhow::{Error, Result};
-/// use tokio::fs::File;
-///
-/// use pbs_tools::zip::{ZipEncoder, ZipEntry};
-///
-/// #[tokio::main]
-/// async fn main() -> Result<(), Error> {
-///     let target = File::open("foo.zip").await?;
-///     let mut source = File::open("foo.txt").await?;
-///
-///     let mut zip = ZipEncoder::new(target);
-///     zip.add_entry(ZipEntry::new(
-///         "foo.txt",
-///         0,
-///         0o100755,
-///         true,
-///     ), Some(source)).await?;
-///
-///     zip.finish().await?;
-///
-///     Ok(())
-/// }
-/// ```
-pub struct ZipEncoder<W>
-where
-    W: AsyncWrite + Unpin,
-{
-    byte_count: usize,
-    files: Vec<ZipEntry>,
-    target: Option<W>,
-}
-
-impl<W: AsyncWrite + Unpin> ZipEncoder<W> {
-    pub fn new(target: W) -> Self {
-        Self {
-            byte_count: 0,
-            files: Vec::new(),
-            target: Some(target),
-        }
-    }
-
-    pub async fn add_entry<R: AsyncRead + Unpin>(
-        &mut self,
-        mut entry: ZipEntry,
-        content: Option<R>,
-    ) -> Result<(), Error> {
-        let mut target = self
-            .target
-            .take()
-            .ok_or_else(|| format_err!("had no target during add entry"))?;
-        entry.offset = self.byte_count.try_into()?;
-        self.byte_count += entry.write_local_header(&mut target).await?;
-        if let Some(content) = content {
-            let mut reader = HashWrapper::new(content);
-            let mut enc = DeflateEncoder::with_quality(target, Level::Fastest);
-
-            enc.compress(&mut reader).await?;
-            let total_in = enc.total_in();
-            let total_out = enc.total_out();
-            target = enc.into_inner();
-
-            let (crc32, _reader) = reader.finish();
-
-            self.byte_count += total_out as usize;
-            entry.compressed_size = total_out;
-            entry.uncompressed_size = total_in;
-
-            entry.crc32 = crc32;
-        }
-        self.byte_count += entry.write_data_descriptor(&mut target).await?;
-        self.target = Some(target);
-
-        self.files.push(entry);
-
-        Ok(())
-    }
-
-    async fn write_eocd(
-        &mut self,
-        central_dir_size: usize,
-        central_dir_offset: usize,
-    ) -> Result<(), Error> {
-        let entrycount = self.files.len();
-        let mut target = self
-            .target
-            .take()
-            .ok_or_else(|| format_err!("had no target during write_eocd"))?;
-
-        let mut count = entrycount as u16;
-        let mut directory_size = central_dir_size as u32;
-        let mut directory_offset = central_dir_offset as u32;
-
-        if central_dir_size > u32::MAX as usize
-            || central_dir_offset > u32::MAX as usize
-            || entrycount > u16::MAX as usize
-        {
-            count = 0xFFFF;
-            directory_size = 0xFFFFFFFF;
-            directory_offset = 0xFFFFFFFF;
-
-            write_struct(
-                &mut target,
-                Zip64EOCDRecord {
-                    signature: ZIP64_EOCD_RECORD,
-                    field_size: 44,
-                    version_made_by: VERSION_MADE_BY,
-                    version_needed: VERSION_NEEDED,
-                    disk_number: 0,
-                    disk_number_central_dir: 0,
-                    disk_record_count: entrycount.try_into()?,
-                    total_record_count: entrycount.try_into()?,
-                    directory_size: central_dir_size.try_into()?,
-                    directory_offset: central_dir_offset.try_into()?,
-                },
-            )
-            .await?;
-
-            let locator_offset = central_dir_offset + central_dir_size;
-
-            write_struct(
-                &mut target,
-                Zip64EOCDLocator {
-                    signature: ZIP64_EOCD_LOCATOR,
-                    disk_number: 0,
-                    offset: locator_offset.try_into()?,
-                    disk_count: 1,
-                },
-            )
-            .await?;
-        }
-
-        write_struct(
-            &mut target,
-            EndOfCentralDir {
-                signature: END_OF_CENTRAL_DIR,
-                disk_number: 0,
-                start_disk: 0,
-                disk_record_count: count,
-                total_record_count: count,
-                directory_size,
-                directory_offset,
-                comment_len: 0,
-            },
-        )
-        .await?;
-
-        self.target = Some(target);
-
-        Ok(())
-    }
-
-    pub async fn finish(&mut self) -> Result<(), Error> {
-        let mut target = self
-            .target
-            .take()
-            .ok_or_else(|| format_err!("had no target during finish"))?;
-        let central_dir_offset = self.byte_count;
-        let mut central_dir_size = 0;
-
-        for file in &self.files {
-            central_dir_size += file.write_central_directory_header(&mut target).await?;
-        }
-
-        self.target = Some(target);
-        self.write_eocd(central_dir_size, central_dir_offset)
-            .await?;
-
-        self.target
-            .take()
-            .ok_or_else(|| format_err!("had no target for flush"))?
-            .flush()
-            .await?;
-
-        Ok(())
-    }
-}
-
-/// Zip a local directory and write encoded data to target. "source" has to point to a valid
-/// directory, it's name will be the root of the zip file - e.g.:
-/// source:
-///         /foo/bar
-/// zip file:
-///         /bar/file1
-///         /bar/dir1
-///         /bar/dir1/file2
-///         ...
-/// ...except if "source" is the root directory
-pub async fn zip_directory<W>(target: W, source: &Path) -> Result<(), Error>
-where
-    W: AsyncWrite + Unpin + Send,
-{
-    use walkdir::WalkDir;
-    use std::os::unix::fs::MetadataExt;
-
-    let base_path = source.parent().unwrap_or_else(|| Path::new("/"));
-    let mut encoder = ZipEncoder::new(target);
-
-    for entry in WalkDir::new(&source).into_iter() {
-        match entry {
-            Ok(entry) => {
-                let entry_path = entry.path().to_owned();
-                let encoder = &mut encoder;
-
-                if let Err(err) = async move {
-                    let entry_path_no_base = entry.path().strip_prefix(base_path)?;
-                    let metadata = entry.metadata()?;
-                    let mtime = match metadata.modified().unwrap_or_else(|_| SystemTime::now()).duration_since(SystemTime::UNIX_EPOCH) {
-                        Ok(dur) => dur.as_secs() as i64,
-                        Err(time_error) => -(time_error.duration().as_secs() as i64)
-                    };
-                    let mode = metadata.mode() as u16;
-
-                    if entry.file_type().is_file() {
-                        let file = tokio::fs::File::open(entry.path()).await?;
-                        let ze = ZipEntry::new(
-                            &entry_path_no_base,
-                            mtime,
-                            mode,
-                            true,
-                        );
-                        encoder.add_entry(ze, Some(file)).await?;
-                    } else if entry.file_type().is_dir() {
-                        let ze = ZipEntry::new(
-                            &entry_path_no_base,
-                            mtime,
-                            mode,
-                            false,
-                        );
-                        let content: Option<tokio::fs::File> = None;
-                        encoder.add_entry(ze, content).await?;
-                    }
-                    // ignore other file types
-                    let ok: Result<(), Error> = Ok(());
-                    ok
-                }
-                .await
-                {
-                    eprintln!(
-                        "zip: error encoding file or directory '{}': {}",
-                        entry_path.display(),
-                        err
-                    );
-                }
-            }
-            Err(err) => {
-                eprintln!("zip: error reading directory entry: {}", err);
-            }
-        }
-    }
-
-    encoder.finish().await
-}
index d83c07a9498c24045dfc44710f946866b826ea12..6b0348c0524fb007d88b0624b4cd39de92da492a 100644 (file)
@@ -23,6 +23,7 @@ pathpatterns = "0.1.2"
 pxar = { version = "0.10.1", features = [ "tokio-io" ] }
 
 proxmox = { version = "0.15.3", features = [ "sortable-macro" ] }
+proxmox-async = "0.1"
 proxmox-router = { version = "1.1", features = [ "cli" ] }
 proxmox-schema = { version = "1", features = [ "api-macro" ] }
 proxmox-time = "1"
@@ -33,5 +34,4 @@ pbs-config = { path = "../pbs-config" }
 pbs-client = { path = "../pbs-client" }
 pbs-datastore = { path = "../pbs-datastore" }
 pbs-fuse-loop = { path = "../pbs-fuse-loop" }
-pbs-runtime = { path = "../pbs-runtime" }
 pbs-tools = { path = "../pbs-tools" }
index c5189e04ca0bce29ed1971e1e438a453465ec7d9..ec479faa12255b975196efa431a5ae6afc665012 100644 (file)
@@ -17,6 +17,7 @@ use proxmox::tools::fs::{file_get_json, replace_file, CreateOptions, image_size}
 use proxmox_router::{ApiMethod, RpcEnvironment, cli::*};
 use proxmox_schema::api;
 use proxmox_time::{strftime_local, epoch_i64};
+use proxmox_async::tokio_writer_adapter::TokioWriterAdapter;
 use pxar::accessor::{MaybeReady, ReadAt, ReadAtOperation};
 
 use pbs_api_types::{
@@ -67,7 +68,6 @@ use pbs_datastore::manifest::{
 };
 use pbs_datastore::read_chunk::AsyncReadChunk;
 use pbs_tools::sync::StdChannelWriter;
-use pbs_tools::tokio::TokioWriterAdapter;
 use pbs_tools::json;
 use pbs_tools::crypt_config::CryptConfig;
 
@@ -486,7 +486,7 @@ fn spawn_catalog_upload(
     encrypt: bool,
 ) -> Result<CatalogUploadResult, Error> {
     let (catalog_tx, catalog_rx) = std::sync::mpsc::sync_channel(10); // allow to buffer 10 writes
-    let catalog_stream = pbs_tools::blocking::StdChannelStream(catalog_rx);
+    let catalog_stream = proxmox_async::blocking::StdChannelStream(catalog_rx);
     let catalog_chunk_size = 512*1024;
     let catalog_chunk_stream = ChunkStream::new(catalog_stream, Some(catalog_chunk_size));
 
@@ -1524,6 +1524,6 @@ fn main() {
 
     let rpcenv = CliEnvironment::new();
     run_cli_command(cmd_def, rpcenv, Some(|future| {
-        pbs_runtime::main(future)
+        proxmox_async::runtime::main(future)
     }));
 }
index 265622f3584da0940d30be1f90e4cb97addcf4df..f0759d1b69ebe65aa9bed788a95c3e6361692083 100644 (file)
@@ -135,7 +135,7 @@ fn mount(
     if verbose {
         // This will stay in foreground with debug output enabled as None is
         // passed for the RawFd.
-        return pbs_runtime::main(mount_do(param, None));
+        return proxmox_async::runtime::main(mount_do(param, None));
     }
 
     // Process should be daemonized.
@@ -151,7 +151,7 @@ fn mount(
         Ok(ForkResult::Child) => {
             drop(pr);
             nix::unistd::setsid().unwrap();
-            pbs_runtime::main(mount_do(param, Some(pw)))
+            proxmox_async::runtime::main(mount_do(param, Some(pw)))
         }
         Err(_) => bail!("failed to daemonize process"),
     }
index a48ca0c3f1f4e7e11857699c5bf19e6aedcad4cf..bbd66705b4b412d39290454a6fb49a9b18488e3a 100644 (file)
@@ -17,6 +17,7 @@ tokio = { version = "1.6", features = [ "io-std", "rt", "rt-multi-thread", "time
 pxar = { version = "0.10.1", features = [ "tokio-io" ] }
 
 proxmox = { version = "0.15.3" }
+proxmox-async = "0.1"
 proxmox-lang = "1"
 proxmox-router = { version = "1.1", features = [ "cli" ] }
 proxmox-schema = { version = "1", features = [ "api-macro" ] }
@@ -29,5 +30,4 @@ pbs-buildcfg = { path = "../pbs-buildcfg" }
 pbs-config = { path = "../pbs-config" }
 pbs-client = { path = "../pbs-client" }
 pbs-datastore = { path = "../pbs-datastore" }
-pbs-runtime = { path = "../pbs-runtime" }
 pbs-tools = { path = "../pbs-tools" }
index dc507dc10d7d2cf34aaa2596fc0786e43c7063ec..c266d014bf3e98c43ab2452c4d910151bdf1d622 100644 (file)
@@ -478,7 +478,7 @@ fn main() {
     run_cli_command(
         cmd_def,
         rpcenv,
-        Some(|future| pbs_runtime::main(future)),
+        Some(|future| proxmox_async::runtime::main(future)),
     );
 }
 
index 71afbc8ff97699d4a1c97b6d9628d07b6c226242..a6b51a65bd3d202b1cbb46faccb90a1de9f443f2 100644 (file)
@@ -31,6 +31,7 @@ tower-service = "0.3.0"
 url = "2.1"
 
 proxmox = "0.15.3"
+proxmox-async = "0.1"
 proxmox-io = "1"
 proxmox-lang = "1"
 proxmox-http = { version = "0.5.0", features = [ "client" ] }
index f27f703db44bce5f325e8dbab6a4f3eaabfa4aad..3a091026e85fcf1335fe40e29697732c5f3c7c2b 100644 (file)
@@ -33,8 +33,8 @@ use proxmox_schema::{
 
 use proxmox_http::client::RateLimitedStream;
 
-use pbs_tools::compression::{DeflateEncoder, Level};
-use pbs_tools::stream::AsyncReaderStream;
+use proxmox_async::compression::{DeflateEncoder, Level};
+use proxmox_async::stream::AsyncReaderStream;
 
 use crate::{
     ApiConfig, FileLogger, AuthError, RestEnvironment, CompressionMethod,
index 955e3ce350e3c6e3303b752de9b1f53c2b883b4a..e4234c76ffb9b33784462ee3d6d9b641592905d3 100644 (file)
@@ -6,7 +6,7 @@ use futures::*;
 
 use tokio::signal::unix::{signal, SignalKind};
 
-use pbs_tools::broadcast_future::BroadcastData;
+use proxmox_async::broadcast_future::BroadcastData;
 
 use crate::request_shutdown;
 
index 30800e38490d1501daad2cf608162d10a3686979..c7ad9f049f9be72cb011662459ce549fd9c1d454 100644 (file)
@@ -27,12 +27,12 @@ pathpatterns = "0.1.2"
 pxar = { version = "0.10.1", features = [ "tokio-io" ] }
 
 proxmox = { version = "0.15.3", features = [ "sortable-macro" ] }
+proxmox-async = "0.1"
 proxmox-router = { version = "1.1", features = [ "cli" ] }
 proxmox-schema = { version = "1", features = [ "api-macro" ] }
 proxmox-time = "1"
 
 pbs-api-types = { path = "../pbs-api-types" }
-pbs-runtime = { path = "../pbs-runtime" }
 pbs-tools = { path = "../pbs-tools" }
 pbs-datastore = { path = "../pbs-datastore" }
 proxmox-rest-server = { path = "../proxmox-rest-server" }
index a1ad6005e0fa0ae42fb99ca3519bc77f7eb8fbe3..5a4a324e8a581729e6e3b0ce6b08e46c798e443c 100644 (file)
@@ -63,7 +63,7 @@ fn main() -> Result<(), Error> {
 
     info!("disk scan complete, starting main runtime...");
 
-    pbs_runtime::main(run())
+    proxmox_async::runtime::main(run())
 }
 
 /// ensure we have our /run dirs, system users and stuff like that setup
index 429ba359bb5b15c865dfff3dfa6d055d4eec30df..129a18facff2815beb2e3e22cdd5892599a758ff 100644 (file)
@@ -19,13 +19,13 @@ use proxmox_router::{
     ApiHandler, ApiMethod, ApiResponseFuture, Permission, Router, RpcEnvironment, SubdirMap,
 };
 use proxmox_schema::*;
+use proxmox_async::zip::zip_directory;
 
 use pbs_api_types::file_restore::RestoreDaemonStatus;
 use pbs_client::pxar::{create_archive, Flags, PxarCreateOptions, ENCODER_MAX_ENTRIES};
 use pbs_datastore::catalog::{ArchiveEntry, DirEntryAttribute};
 use pbs_tools::fs::read_subdir;
 use pbs_tools::json::required_string_param;
-use pbs_tools::zip::zip_directory;
 
 use pxar::encoder::aio::TokioWriter;
 
index d44a3ffd837af7aa3c6f599df4c15ff5c265ffc0..732b326246b96814d35d8f1b4843d64f376615cd 100644 (file)
@@ -17,10 +17,10 @@ tokio = { version = "1.6", features = [ "rt", "rt-multi-thread" ] }
 
 pathpatterns = "0.1.2"
 proxmox = "0.15.3"
+proxmox-async = "0.1"
 proxmox-schema = { version = "1", features = [ "api-macro" ] }
 proxmox-router = "1.1"
 pxar = { version = "0.10.1", features = [ "tokio-io" ] }
 
 pbs-client = { path = "../pbs-client" }
-pbs-runtime = { path = "../pbs-runtime" }
 pbs-tools = { path = "../pbs-tools" }
index 6f60494d9938a069bb6638c8715667642b0c359d..cb31bf75c07e98c0457e0bd08332ca71c2fffef5 100644 (file)
@@ -488,6 +488,6 @@ fn main() {
 
     let rpcenv = CliEnvironment::new();
     run_cli_command(cmd_def, rpcenv, Some(|future| {
-        pbs_runtime::main(future)
+        proxmox_async::runtime::main(future)
     }));
 }
index 2ebe18c58655970a058cef6a9c2bd07857c39f6c..e95656ce0ba01830c434ebb590a9753d921467e6 100644 (file)
@@ -22,6 +22,8 @@ use proxmox_router::{
 };
 use proxmox_schema::*;
 use proxmox_sys::{task_log, task_warn};
+use proxmox_async::blocking::WrappedReaderStream;
+use proxmox_async::stream::{AsyncReaderStream, AsyncChannelWriter};
 
 use pxar::accessor::aio::Accessor;
 use pxar::EntryKind;
@@ -53,8 +55,6 @@ use pbs_datastore::fixed_index::{FixedIndexReader};
 use pbs_datastore::index::IndexFile;
 use pbs_datastore::manifest::{BackupManifest, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME};
 use pbs_datastore::prune::compute_prune_info;
-use pbs_tools::blocking::WrappedReaderStream;
-use pbs_tools::stream::{AsyncReaderStream, AsyncChannelWriter};
 use pbs_tools::json::{required_integer_param, required_string_param};
 use pbs_config::CachedUserInfo;
 use proxmox_rest_server::{WorkerTask, formatter};
index eea625002947b53212b8e09ee14dac9cfc1959fb..3a902e7af9a72341507576762f4a56fcbb264307 100644 (file)
@@ -226,7 +226,7 @@ async move {
             };
             if benchmark {
                 env.log("benchmark finished successfully");
-                pbs_runtime::block_in_place(|| env.remove_backup())?;
+                proxmox_async::runtime::block_in_place(|| env.remove_backup())?;
                 return Ok(());
             }
 
@@ -254,13 +254,13 @@ async move {
                 (Ok(_), Err(err)) => {
                     env.log(format!("backup ended and finish failed: {}", err));
                     env.log("removing unfinished backup");
-                    pbs_runtime::block_in_place(|| env.remove_backup())?;
+                    proxmox_async::runtime::block_in_place(|| env.remove_backup())?;
                     Err(err)
                 },
                 (Err(err), Err(_)) => {
                     env.log(format!("backup failed: {}", err));
                     env.log("removing failed backup");
-                    pbs_runtime::block_in_place(|| env.remove_backup())?;
+                    proxmox_async::runtime::block_in_place(|| env.remove_backup())?;
                     Err(err)
                 },
             }
index 75cb44e112171cd866a10d1e37702e994d6665eb..604a4edeecee1e6aff276c1aecb1cbb16031eb78 100644 (file)
@@ -62,7 +62,7 @@ impl Future for UploadChunk {
                         let (is_duplicate, compressed_size) = match proxmox_lang::try_block! {
                             let mut chunk = DataBlob::from_raw(raw_data)?;
 
-                            pbs_runtime::block_in_place(|| {
+                            proxmox_async::runtime::block_in_place(|| {
                                 chunk.verify_unencrypted(this.size as usize, &this.digest)?;
 
                                 // always comput CRC at server side
index a31e73a143943413d14725cba6fe0471e71ac94a..564748a9153f5f34a541f421c6f912e7ab561c70 100644 (file)
@@ -244,7 +244,7 @@ fn apt_get_changelog(
     let changelog_url = &pkg_info[0].change_log_url;
     // FIXME: use 'apt-get changelog' for proxmox packages as well, once repo supports it
     if changelog_url.starts_with("http://download.proxmox.com/") {
-        let changelog = pbs_runtime::block_on(client.get_string(changelog_url, None))
+        let changelog = proxmox_async::runtime::block_on(client.get_string(changelog_url, None))
             .map_err(|err| format_err!("Error downloading changelog from '{}': {}", changelog_url, err))?;
         Ok(json!(changelog))
 
@@ -268,7 +268,7 @@ fn apt_get_changelog(
         auth_header.insert("Authorization".to_owned(),
             format!("Basic {}", base64::encode(format!("{}:{}", key, id))));
 
-        let changelog = pbs_runtime::block_on(client.get_string(changelog_url, Some(&auth_header)))
+        let changelog = proxmox_async::runtime::block_on(client.get_string(changelog_url, Some(&auth_header)))
             .map_err(|err| format_err!("Error downloading changelog from '{}': {}", changelog_url, err))?;
         Ok(json!(changelog))
 
index f418f50fbf3b0d6da35d85ff45bbccab06954f56..169ba6133552d49e8614a39cc6bdd797785fc85b 100644 (file)
@@ -289,7 +289,7 @@ fn download_chunk(
 
         env.debug(format!("download chunk {:?}", path));
 
-        let data = pbs_runtime::block_in_place(|| std::fs::read(path))
+        let data = proxmox_async::runtime::block_in_place(|| std::fs::read(path))
             .map_err(move |err| http_err!(BAD_REQUEST, "reading file {:?} failed: {}", path2, err))?;
 
         let body = Body::from(data);
index 9eb20269c9942e8597f6edae49a132381813634b..c5afe9dc425896d58823c123e8037604e1389ea3 100644 (file)
@@ -21,7 +21,7 @@ use proxmox_backup::config;
 fn main() {
     proxmox_backup::tools::setup_safe_path_env();
 
-    if let Err(err) = pbs_runtime::main(run()) {
+    if let Err(err) = proxmox_async::runtime::main(run()) {
         eprintln!("Error: {}", err);
         std::process::exit(-1);
     }
index 35ef320afa57bf5bb96dd9ba5103df2822db9617..33ac4d50bdbbaadb7e1f5f8f540a2caf9ab9dbac 100644 (file)
@@ -20,5 +20,5 @@ fn main() {
     let mut rpcenv = CliEnvironment::new();
     rpcenv.set_auth_id(Some(format!("{}@pam", username)));
 
-    run_cli_command(cmd_def, rpcenv, Some(|future| pbs_runtime::main(future)));
+    run_cli_command(cmd_def, rpcenv, Some(|future| proxmox_async::runtime::main(future)));
 }
index eea262c56d7a06314f5403adbd998c5bf88190f0..fa075e2ee49a2ba9272556de29b57b67f3f26c48 100644 (file)
@@ -440,7 +440,7 @@ fn main() -> Result<(), Error> {
 
     proxmox_backup::tools::setup_safe_path_env();
 
-    pbs_runtime::main(run())
+    proxmox_async::runtime::main(run())
 }
 
 fn get_sync_job(id: &String) -> Result<SyncJobConfig, Error> {
@@ -499,7 +499,7 @@ pub fn complete_remote_datastore_name(_arg: &str, param: &HashMap<String, String
     let mut list = Vec::new();
 
     if let Some(remote) = get_remote(param) {
-        if let Ok(data) = pbs_runtime::block_on(async move {
+        if let Ok(data) = proxmox_async::runtime::block_on(async move {
                 crate::api2::config::remote::scan_remote_datastores(remote).await
             }) {
 
@@ -518,7 +518,7 @@ pub fn complete_remote_datastore_group(_arg: &str, param: &HashMap<String, Strin
     let mut list = Vec::new();
 
     if let Some((remote, remote_store)) = get_remote_store(param) {
-        if let Ok(data) = pbs_runtime::block_on(async move {
+        if let Ok(data) = proxmox_async::runtime::block_on(async move {
             crate::api2::config::remote::scan_remote_groups(remote.clone(), remote_store.clone()).await
         }) {
 
index 134dc7b724b8be7423e70f4f2212be8ca9e8edce..932b0d78af5b936bf4dd0e8d05b91c60b32feb99 100644 (file)
@@ -85,7 +85,7 @@ fn main() -> Result<(), Error> {
         bail!("proxy not running as backup user or group (got uid {} gid {})", running_uid, running_gid);
     }
 
-    pbs_runtime::main(run())
+    proxmox_async::runtime::main(run())
 }
 
 
@@ -845,7 +845,7 @@ async fn schedule_task_log_rotate() {
 
                 if logrotate.rotate(max_size)? {
                     println!("rotated access log, telling daemons to re-open log file");
-                    pbs_runtime::block_on(command_reopen_access_logfiles())?;
+                    proxmox_async::runtime::block_on(command_reopen_access_logfiles())?;
                     task_log!(worker, "API access log was rotated");
                 } else {
                     task_log!(worker, "API access log was not rotated");
@@ -860,7 +860,7 @@ async fn schedule_task_log_rotate() {
 
                 if logrotate.rotate(max_size)? {
                     println!("rotated auth log, telling daemons to re-open log file");
-                    pbs_runtime::block_on(command_reopen_auth_logfiles())?;
+                    proxmox_async::runtime::block_on(command_reopen_auth_logfiles())?;
                     task_log!(worker, "API authentication log was rotated");
                 } else {
                     task_log!(worker, "API authentication log was not rotated");
index 207453754f37fa83c3a060b516d4ddea6456d30d..03e2ec023639a7613ec69c90ad6732e90960f5de 100644 (file)
@@ -104,7 +104,7 @@ fn main() {
     let mut rpcenv = CliEnvironment::new();
     rpcenv.set_auth_id(Some(String::from("root@pam")));
 
-    if let Err(err) = pbs_runtime::main(run(&mut rpcenv)) {
+    if let Err(err) = proxmox_async::runtime::main(run(&mut rpcenv)) {
         eprintln!("error during update: {}", err);
         std::process::exit(1);
     }
index f49857dd825381c6be2663ff178d7f5d8576a5d4..25a3960119cda4e4789a7fd99c39db0b6ec9d2e7 100644 (file)
@@ -74,7 +74,7 @@ pub fn complete_datastore_group_filter(_arg: &str, param: &HashMap<String, Strin
     list.push("type:vm".to_string());
 
     if let Some(store) =  param.get("store") {
-        let groups = pbs_runtime::block_on(async { get_backup_groups(store).await });
+        let groups = proxmox_async::runtime::block_on(async { get_backup_groups(store).await });
         if let Ok(groups) = groups {
             list.extend(groups.iter().map(|group| format!("group:{}/{}", group.backup_type, group.backup_id)));
         }
@@ -1135,5 +1135,5 @@ fn main() {
     let mut rpcenv = CliEnvironment::new();
     rpcenv.set_auth_id(Some(String::from("root@pam")));
 
-    pbs_runtime::main(run_async_cli_command(cmd_def, rpcenv));
+    proxmox_async::runtime::main(run_async_cli_command(cmd_def, rpcenv));
 }
index 5b9594f3acd61f95e9eed528ea96cc048ed89dad..4b6e9d081a626235ebab57e27414817d77a8ec57 100644 (file)
@@ -23,7 +23,7 @@ const URL_ASCIISET: percent_encoding::AsciiSet = percent_encoding::NON_ALPHANUME
 macro_rules! complete_api_path {
     ($capability:expr) => {
         |complete_me: &str, _map: &HashMap<String, String>| {
-            pbs_runtime::block_on(async { complete_api_path_do(complete_me, $capability).await })
+            proxmox_async::runtime::block_on(async { complete_api_path_do(complete_me, $capability).await })
         }
     };
 }
index c83a626e3c0ac4f2e9284fc33d9d883b99eb8036..97eee1e6d24866ada223612adfdbf0a298ae6a60 100644 (file)
@@ -124,7 +124,7 @@ async fn pull_index_chunks<I: IndexFile>(
             let verify_and_write_channel = verify_and_write_channel.clone();
 
             Ok::<_, Error>(async move {
-                let chunk_exists = pbs_runtime::block_in_place(|| {
+                let chunk_exists = proxmox_async::runtime::block_in_place(|| {
                     target.cond_touch_chunk(&info.digest, false)
                 })?;
                 if chunk_exists {
@@ -136,7 +136,7 @@ async fn pull_index_chunks<I: IndexFile>(
                 let raw_size = chunk.raw_size() as usize;
 
                 // decode, verify and write in a separate threads to maximize throughput
-                pbs_runtime::block_in_place(|| {
+                proxmox_async::runtime::block_in_place(|| {
                     verify_and_write_channel.send((chunk, info.digest, info.size()))
                 })?;
 
index 5b1e6b639a91765e1f709b1ad8e5e7af303a417d..85a37ce2cb1e669c33e7c8eb840f8480806444b4 100644 (file)
@@ -231,7 +231,7 @@ pub fn check_subscription(key: String, server_id: String) -> Result<Subscription
 
     let now = proxmox_time::epoch_i64();
 
-    let (response, challenge) = pbs_runtime::block_on(register_subscription(&key, &server_id, now))
+    let (response, challenge) = proxmox_async::runtime::block_on(register_subscription(&key, &server_id, now))
         .map_err(|err| format_err!("Error checking subscription: {}", err))?;
 
     parse_register_response(&response, key, server_id, now, &challenge)