From 9a1b24b6b1089ab8435a38afe4a3de3f99d7628e Mon Sep 17 00:00:00 2001 From: Dietmar Maurer Date: Fri, 19 Nov 2021 17:36:06 +0100 Subject: [PATCH] use new proxmox-async crate Signed-off-by: Dietmar Maurer --- Cargo.toml | 3 +- Makefile | 1 - debian/control | 1 + examples/download-speed.rs | 2 +- examples/h2client.rs | 2 +- examples/h2s-client.rs | 2 +- examples/h2s-server.rs | 2 +- examples/h2server.rs | 2 +- examples/test_chunk_speed2.rs | 2 +- examples/upload-speed.rs | 2 +- pbs-client/Cargo.toml | 2 +- pbs-client/src/catalog_shell.rs | 2 +- pbs-client/src/http_client.rs | 2 +- pbs-client/src/pxar/extract.rs | 2 +- pbs-client/src/pxar_backup_stream.rs | 5 +- pbs-client/src/remote_chunk_reader.rs | 3 +- pbs-client/src/tools/mod.rs | 10 +- pbs-runtime/Cargo.toml | 11 - pbs-runtime/src/lib.rs | 203 ------ pbs-tools/Cargo.toml | 2 +- pbs-tools/src/async_lru_cache.rs | 2 +- pbs-tools/src/blocking.rs | 99 --- pbs-tools/src/broadcast_future.rs | 180 ----- pbs-tools/src/compression.rs | 194 ----- pbs-tools/src/lib.rs | 6 - pbs-tools/src/stream.rs | 229 ------ pbs-tools/src/tokio/mod.rs | 2 - pbs-tools/src/tokio/tokio_writer_adapter.rs | 26 - pbs-tools/src/zip.rs | 672 ------------------ proxmox-backup-client/Cargo.toml | 2 +- proxmox-backup-client/src/main.rs | 6 +- proxmox-backup-client/src/mount.rs | 4 +- proxmox-file-restore/Cargo.toml | 2 +- proxmox-file-restore/src/main.rs | 2 +- proxmox-rest-server/Cargo.toml | 1 + proxmox-rest-server/src/rest.rs | 4 +- proxmox-rest-server/src/state.rs | 2 +- proxmox-restore-daemon/Cargo.toml | 2 +- proxmox-restore-daemon/src/main.rs | 2 +- .../src/proxmox_restore_daemon/api.rs | 2 +- pxar-bin/Cargo.toml | 2 +- pxar-bin/src/main.rs | 2 +- src/api2/admin/datastore.rs | 4 +- src/api2/backup/mod.rs | 6 +- src/api2/backup/upload_chunk.rs | 2 +- src/api2/node/apt.rs | 4 +- src/api2/reader/mod.rs | 2 +- src/bin/proxmox-backup-api.rs | 2 +- src/bin/proxmox-backup-debug.rs | 2 +- src/bin/proxmox-backup-manager.rs | 6 +- src/bin/proxmox-backup-proxy.rs | 6 +- src/bin/proxmox-daily-update.rs | 2 +- src/bin/proxmox-tape.rs | 4 +- src/bin/proxmox_backup_debug/api.rs | 2 +- src/server/pull.rs | 4 +- src/tools/subscription.rs | 2 +- 56 files changed, 66 insertions(+), 1686 deletions(-) delete mode 100644 pbs-runtime/Cargo.toml delete mode 100644 pbs-runtime/src/lib.rs delete mode 100644 pbs-tools/src/blocking.rs delete mode 100644 pbs-tools/src/broadcast_future.rs delete mode 100644 pbs-tools/src/compression.rs delete mode 100644 pbs-tools/src/stream.rs delete mode 100644 pbs-tools/src/tokio/mod.rs delete mode 100644 pbs-tools/src/tokio/tokio_writer_adapter.rs delete mode 100644 pbs-tools/src/zip.rs diff --git a/Cargo.toml b/Cargo.toml index 556e28e2..871fdfaa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,7 +25,6 @@ members = [ "pbs-config", "pbs-datastore", "pbs-fuse-loop", - "pbs-runtime", "proxmox-rest-server", "proxmox-rrd", "pbs-tape", @@ -110,6 +109,7 @@ proxmox-sys = "0.1" proxmox-acme-rs = "0.3" proxmox-apt = "0.8.0" +proxmox-async = "0.1" proxmox-openid = "0.9.0" pbs-api-types = { path = "pbs-api-types" } @@ -117,7 +117,6 @@ pbs-buildcfg = { path = "pbs-buildcfg" } pbs-client = { path = "pbs-client" } pbs-config = { path = "pbs-config" } pbs-datastore = { path = "pbs-datastore" } -pbs-runtime = { path = "pbs-runtime" } proxmox-rest-server = { path = "proxmox-rest-server" } proxmox-rrd = { path = "proxmox-rrd" } pbs-tools = { path = "pbs-tools" } diff --git a/Makefile b/Makefile index 20d1382f..6a38931f 100644 --- a/Makefile +++ b/Makefile @@ -38,7 +38,6 @@ SUBCRATES := \ pbs-config \ pbs-datastore \ pbs-fuse-loop \ - pbs-runtime \ proxmox-rest-server \ proxmox-rrd \ pbs-tape \ diff --git a/debian/control b/debian/control index 780af008..2731ee11 100644 --- a/debian/control +++ b/debian/control @@ -44,6 +44,7 @@ Build-Depends: debhelper (>= 12), librust-proxmox-0.15+default-dev (>= 0.15.3-~~), librust-proxmox-0.15+sortable-macro-dev (>= 0.15.3-~~), librust-proxmox-0.15+tokio-dev (>= 0.15.3-~~), + librust-proxmox-async-0.1+default-dev, librust-proxmox-acme-rs-0.3+default-dev, librust-proxmox-apt-0.8+default-dev, librust-proxmox-borrow-1+default-dev, diff --git a/examples/download-speed.rs b/examples/download-speed.rs index a248ae87..75176db1 100644 --- a/examples/download-speed.rs +++ b/examples/download-speed.rs @@ -59,7 +59,7 @@ async fn run() -> Result<(), Error> { } fn main() { - if let Err(err) = pbs_runtime::main(run()) { + if let Err(err) = proxmox_async::runtime::main(run()) { eprintln!("ERROR: {}", err); } println!("DONE"); diff --git a/examples/h2client.rs b/examples/h2client.rs index 754a122d..2588631e 100644 --- a/examples/h2client.rs +++ b/examples/h2client.rs @@ -69,7 +69,7 @@ fn send_request( } fn main() -> Result<(), Error> { - pbs_runtime::main(run()) + proxmox_async::runtime::main(run()) } async fn run() -> Result<(), Error> { diff --git a/examples/h2s-client.rs b/examples/h2s-client.rs index 4bd54a15..356dbc59 100644 --- a/examples/h2s-client.rs +++ b/examples/h2s-client.rs @@ -69,7 +69,7 @@ fn send_request( } fn main() -> Result<(), Error> { - pbs_runtime::main(run()) + proxmox_async::runtime::main(run()) } async fn run() -> Result<(), Error> { diff --git a/examples/h2s-server.rs b/examples/h2s-server.rs index a11476a8..f1f08513 100644 --- a/examples/h2s-server.rs +++ b/examples/h2s-server.rs @@ -9,7 +9,7 @@ use tokio::net::{TcpListener, TcpStream}; use pbs_buildcfg::configdir; fn main() -> Result<(), Error> { - pbs_runtime::main(run()) + proxmox_async::runtime::main(run()) } async fn run() -> Result<(), Error> { diff --git a/examples/h2server.rs b/examples/h2server.rs index 98e06f52..5802fc88 100644 --- a/examples/h2server.rs +++ b/examples/h2server.rs @@ -5,7 +5,7 @@ use hyper::{Body, Request, Response}; use tokio::net::{TcpListener, TcpStream}; fn main() -> Result<(), Error> { - pbs_runtime::main(run()) + proxmox_async::runtime::main(run()) } async fn run() -> Result<(), Error> { diff --git a/examples/test_chunk_speed2.rs b/examples/test_chunk_speed2.rs index f6417532..9274183a 100644 --- a/examples/test_chunk_speed2.rs +++ b/examples/test_chunk_speed2.rs @@ -13,7 +13,7 @@ use pbs_client::ChunkStream; // Note: I can currently get about 830MB/s fn main() { - if let Err(err) = pbs_runtime::main(run()) { + if let Err(err) = proxmox_async::runtime::main(run()) { panic!("ERROR: {}", err); } } diff --git a/examples/upload-speed.rs b/examples/upload-speed.rs index b670d060..04e35124 100644 --- a/examples/upload-speed.rs +++ b/examples/upload-speed.rs @@ -27,7 +27,7 @@ async fn upload_speed() -> Result { } fn main() { - match pbs_runtime::main(upload_speed()) { + match proxmox_async::runtime::main(upload_speed()) { Ok(mbs) => { println!("average upload speed: {} MB/s", mbs); } diff --git a/pbs-client/Cargo.toml b/pbs-client/Cargo.toml index 135e4045..a12f512b 100644 --- a/pbs-client/Cargo.toml +++ b/pbs-client/Cargo.toml @@ -29,6 +29,7 @@ xdg = "2.2" pathpatterns = "0.1.2" proxmox = "0.15.3" +proxmox-async = "0.1" proxmox-fuse = "0.1.1" proxmox-http = { version = "0.5.4", features = [ "client", "http-helpers", "websocket" ] } proxmox-io = { version = "1", features = [ "tokio" ] } @@ -41,5 +42,4 @@ pxar = { version = "0.10.1", features = [ "tokio-io" ] } pbs-api-types = { path = "../pbs-api-types" } pbs-buildcfg = { path = "../pbs-buildcfg" } pbs-datastore = { path = "../pbs-datastore" } -pbs-runtime = { path = "../pbs-runtime" } pbs-tools = { path = "../pbs-tools" } diff --git a/pbs-client/src/catalog_shell.rs b/pbs-client/src/catalog_shell.rs index dbc23ef6..aab4c989 100644 --- a/pbs-client/src/catalog_shell.rs +++ b/pbs-client/src/catalog_shell.rs @@ -19,7 +19,7 @@ use proxmox_router::cli::{self, CliCommand, CliCommandMap, CliHelper, CommandLin use proxmox_schema::api; use pxar::{EntryKind, Metadata}; -use pbs_runtime::block_in_place; +use proxmox_async::runtime::block_in_place; use pbs_datastore::catalog::{self, DirEntryAttribute}; use crate::pxar::Flags; diff --git a/pbs-client/src/http_client.rs b/pbs-client/src/http_client.rs index defaef8a..61f05f28 100644 --- a/pbs-client/src/http_client.rs +++ b/pbs-client/src/http_client.rs @@ -22,9 +22,9 @@ use proxmox_router::HttpError; use proxmox_http::client::{HttpsConnector, RateLimiter}; use proxmox_http::uri::build_authority; +use proxmox_async::broadcast_future::BroadcastFuture; use pbs_api_types::{Authid, Userid}; -use pbs_tools::broadcast_future::BroadcastFuture; use pbs_tools::json::json_object_to_query; use pbs_tools::ticket; use pbs_tools::percent_encoding::DEFAULT_ENCODE_SET; diff --git a/pbs-client/src/pxar/extract.rs b/pbs-client/src/pxar/extract.rs index 52c6bf34..350f96cf 100644 --- a/pbs-client/src/pxar/extract.rs +++ b/pbs-client/src/pxar/extract.rs @@ -25,7 +25,7 @@ use proxmox::c_result; use proxmox::tools::fs::{create_path, CreateOptions}; use proxmox_io::{sparse_copy, sparse_copy_async}; -use pbs_tools::zip::{ZipEncoder, ZipEntry}; +use proxmox_async::zip::{ZipEncoder, ZipEntry}; use crate::pxar::dir_stack::PxarDirStack; use crate::pxar::metadata; diff --git a/pbs-client/src/pxar_backup_stream.rs b/pbs-client/src/pxar_backup_stream.rs index d39eb6c4..863c3fb4 100644 --- a/pbs-client/src/pxar_backup_stream.rs +++ b/pbs-client/src/pxar_backup_stream.rs @@ -12,9 +12,10 @@ use nix::dir::Dir; use nix::fcntl::OFlag; use nix::sys::stat::Mode; +use proxmox_async::tokio_writer_adapter::TokioWriterAdapter; + use pbs_datastore::catalog::CatalogWriter; use pbs_tools::sync::StdChannelWriter; -use pbs_tools::tokio::TokioWriterAdapter; /// Stream implementation to encode and upload .pxar archives. /// @@ -111,7 +112,7 @@ impl Stream for PxarBackupStream { } } - match pbs_runtime::block_in_place(|| self.rx.as_ref().unwrap().recv()) { + match proxmox_async::runtime::block_in_place(|| self.rx.as_ref().unwrap().recv()) { Ok(data) => Poll::Ready(Some(data)), Err(_) => { let error = self.error.lock().unwrap(); diff --git a/pbs-client/src/remote_chunk_reader.rs b/pbs-client/src/remote_chunk_reader.rs index 734cd29f..ed7cda2d 100644 --- a/pbs-client/src/remote_chunk_reader.rs +++ b/pbs-client/src/remote_chunk_reader.rs @@ -5,12 +5,13 @@ use std::sync::{Arc, Mutex}; use anyhow::{bail, Error}; +use proxmox_async::runtime::block_on; + use pbs_tools::crypt_config::CryptConfig; use pbs_api_types::CryptMode; use pbs_datastore::data_blob::DataBlob; use pbs_datastore::read_chunk::ReadChunk; use pbs_datastore::read_chunk::AsyncReadChunk; -use pbs_runtime::block_on; use super::BackupReader; diff --git a/pbs-client/src/tools/mod.rs b/pbs-client/src/tools/mod.rs index 539ad662..a38b1c87 100644 --- a/pbs-client/src/tools/mod.rs +++ b/pbs-client/src/tools/mod.rs @@ -194,7 +194,7 @@ pub async fn try_get(repo: &BackupRepository, url: &str) -> Value { } pub fn complete_backup_group(_arg: &str, param: &HashMap) -> Vec { - pbs_runtime::main(async { complete_backup_group_do(param).await }) + proxmox_async::runtime::main(async { complete_backup_group_do(param).await }) } pub async fn complete_backup_group_do(param: &HashMap) -> Vec { @@ -224,7 +224,7 @@ pub async fn complete_backup_group_do(param: &HashMap) -> Vec) -> Vec { - pbs_runtime::main(async { complete_group_or_snapshot_do(arg, param).await }) + proxmox_async::runtime::main(async { complete_group_or_snapshot_do(arg, param).await }) } pub async fn complete_group_or_snapshot_do(arg: &str, param: &HashMap) -> Vec { @@ -243,7 +243,7 @@ pub async fn complete_group_or_snapshot_do(arg: &str, param: &HashMap) -> Vec { - pbs_runtime::main(async { complete_backup_snapshot_do(param).await }) + proxmox_async::runtime::main(async { complete_backup_snapshot_do(param).await }) } pub async fn complete_backup_snapshot_do(param: &HashMap) -> Vec { @@ -275,7 +275,7 @@ pub async fn complete_backup_snapshot_do(param: &HashMap) -> Vec } pub fn complete_server_file_name(_arg: &str, param: &HashMap) -> Vec { - pbs_runtime::main(async { complete_server_file_name_do(param).await }) + proxmox_async::runtime::main(async { complete_server_file_name_do(param).await }) } pub async fn complete_server_file_name_do(param: &HashMap) -> Vec { @@ -366,7 +366,7 @@ pub fn complete_chunk_size(_arg: &str, _param: &HashMap) -> Vec< } pub fn complete_auth_id(_arg: &str, param: &HashMap) -> Vec { - pbs_runtime::main(async { complete_auth_id_do(param).await }) + proxmox_async::runtime::main(async { complete_auth_id_do(param).await }) } pub async fn complete_auth_id_do(param: &HashMap) -> Vec { diff --git a/pbs-runtime/Cargo.toml b/pbs-runtime/Cargo.toml deleted file mode 100644 index 72c25ac2..00000000 --- a/pbs-runtime/Cargo.toml +++ /dev/null @@ -1,11 +0,0 @@ -[package] -name = "pbs-runtime" -version = "0.1.0" -authors = ["Proxmox Support Team "] -edition = "2018" -description = "tokio runtime related helpers required for binaries" - -[dependencies] -lazy_static = "1.4" -pin-utils = "0.1.0" -tokio = { version = "1.6", features = [ "rt", "rt-multi-thread" ] } diff --git a/pbs-runtime/src/lib.rs b/pbs-runtime/src/lib.rs deleted file mode 100644 index baa7ded0..00000000 --- a/pbs-runtime/src/lib.rs +++ /dev/null @@ -1,203 +0,0 @@ -//! Helpers for quirks of the current tokio runtime. - -use std::cell::RefCell; -use std::future::Future; -use std::sync::{Arc, Weak, Mutex}; -use std::task::{Context, Poll, RawWaker, Waker}; -use std::thread::{self, Thread}; - -use lazy_static::lazy_static; -use pin_utils::pin_mut; -use tokio::runtime::{self, Runtime}; - -thread_local! { - static BLOCKING: RefCell = RefCell::new(false); -} - -fn is_in_tokio() -> bool { - tokio::runtime::Handle::try_current() - .is_ok() -} - -fn is_blocking() -> bool { - BLOCKING.with(|v| *v.borrow()) -} - -struct BlockingGuard(bool); - -impl BlockingGuard { - fn set() -> Self { - Self(BLOCKING.with(|v| { - let old = *v.borrow(); - *v.borrow_mut() = true; - old - })) - } -} - -impl Drop for BlockingGuard { - fn drop(&mut self) { - BLOCKING.with(|v| { - *v.borrow_mut() = self.0; - }); - } -} - -lazy_static! { - // avoid openssl bug: https://github.com/openssl/openssl/issues/6214 - // by dropping the runtime as early as possible - static ref RUNTIME: Mutex> = Mutex::new(Weak::new()); -} - -#[link(name = "crypto")] -extern "C" { - fn OPENSSL_thread_stop(); -} - -/// Get or create the current main tokio runtime. -/// -/// This makes sure that tokio's worker threads are marked for us so that we know whether we -/// can/need to use `block_in_place` in our `block_on` helper. -pub fn get_runtime_with_builder runtime::Builder>(get_builder: F) -> Arc { - - let mut guard = RUNTIME.lock().unwrap(); - - if let Some(rt) = guard.upgrade() { return rt; } - - let mut builder = get_builder(); - builder.on_thread_stop(|| { - // avoid openssl bug: https://github.com/openssl/openssl/issues/6214 - // call OPENSSL_thread_stop to avoid race with openssl cleanup handlers - unsafe { OPENSSL_thread_stop(); } - }); - - let runtime = builder.build().expect("failed to spawn tokio runtime"); - let rt = Arc::new(runtime); - - *guard = Arc::downgrade(&rt); - - rt -} - -/// Get or create the current main tokio runtime. -/// -/// This calls get_runtime_with_builder() using the tokio default threaded scheduler -pub fn get_runtime() -> Arc { - - get_runtime_with_builder(|| { - let mut builder = runtime::Builder::new_multi_thread(); - builder.enable_all(); - builder - }) -} - - -/// Block on a synchronous piece of code. -pub fn block_in_place(fut: impl FnOnce() -> R) -> R { - // don't double-exit the context (tokio doesn't like that) - // also, if we're not actually in a tokio-worker we must not use block_in_place() either - if is_blocking() || !is_in_tokio() { - fut() - } else { - // we are in an actual tokio worker thread, block it: - tokio::task::block_in_place(move || { - let _guard = BlockingGuard::set(); - fut() - }) - } -} - -/// Block on a future in this thread. -pub fn block_on(fut: F) -> F::Output { - // don't double-exit the context (tokio doesn't like that) - if is_blocking() { - block_on_local_future(fut) - } else if is_in_tokio() { - // inside a tokio worker we need to tell tokio that we're about to really block: - tokio::task::block_in_place(move || { - let _guard = BlockingGuard::set(); - block_on_local_future(fut) - }) - } else { - // not a worker thread, not associated with a runtime, make sure we have a runtime (spawn - // it on demand if necessary), then enter it - let _guard = BlockingGuard::set(); - let _enter_guard = get_runtime().enter(); - get_runtime().block_on(fut) - } -} - -/* -fn block_on_impl(mut fut: F) -> F::Output -where - F: Future + Send, - F::Output: Send + 'static, -{ - let (tx, rx) = tokio::sync::oneshot::channel(); - let fut_ptr = &mut fut as *mut F as usize; // hack to not require F to be 'static - tokio::spawn(async move { - let fut: F = unsafe { std::ptr::read(fut_ptr as *mut F) }; - tx - .send(fut.await) - .map_err(drop) - .expect("failed to send block_on result to channel") - }); - - futures::executor::block_on(async move { - rx.await.expect("failed to receive block_on result from channel") - }) - std::mem::forget(fut); -} -*/ - -/// This used to be our tokio main entry point. Now this just calls out to `block_on` for -/// compatibility, which will perform all the necessary tasks on-demand anyway. -pub fn main(fut: F) -> F::Output { - block_on(fut) -} - -fn block_on_local_future(fut: F) -> F::Output { - pin_mut!(fut); - - let waker = Arc::new(thread::current()); - let waker = thread_waker_clone(Arc::into_raw(waker) as *const ()); - let waker = unsafe { Waker::from_raw(waker) }; - let mut context = Context::from_waker(&waker); - loop { - match fut.as_mut().poll(&mut context) { - Poll::Ready(out) => return out, - Poll::Pending => thread::park(), - } - } -} - -const THREAD_WAKER_VTABLE: std::task::RawWakerVTable = std::task::RawWakerVTable::new( - thread_waker_clone, - thread_waker_wake, - thread_waker_wake_by_ref, - thread_waker_drop, -); - -fn thread_waker_clone(this: *const ()) -> RawWaker { - let this = unsafe { Arc::from_raw(this as *const Thread) }; - let cloned = Arc::clone(&this); - let _ = Arc::into_raw(this); - - RawWaker::new(Arc::into_raw(cloned) as *const (), &THREAD_WAKER_VTABLE) -} - -fn thread_waker_wake(this: *const ()) { - let this = unsafe { Arc::from_raw(this as *const Thread) }; - this.unpark(); -} - -fn thread_waker_wake_by_ref(this: *const ()) { - let this = unsafe { Arc::from_raw(this as *const Thread) }; - this.unpark(); - let _ = Arc::into_raw(this); -} - -fn thread_waker_drop(this: *const ()) { - let this = unsafe { Arc::from_raw(this as *const Thread) }; - drop(this); -} diff --git a/pbs-tools/Cargo.toml b/pbs-tools/Cargo.toml index 14f61f48..d0c6b3ff 100644 --- a/pbs-tools/Cargo.toml +++ b/pbs-tools/Cargo.toml @@ -33,13 +33,13 @@ walkdir = "2" zstd = { version = "0.6", features = [ "bindgen" ] } proxmox = { version = "0.15.3", default-features = false, features = [ "tokio" ] } +proxmox-async = "0.1" proxmox-borrow = "1" proxmox-io = { version = "1", features = [ "tokio" ] } proxmox-lang = { version = "1" } proxmox-time = { version = "1" } pbs-buildcfg = { path = "../pbs-buildcfg" } -pbs-runtime = { path = "../pbs-runtime" } [dev-dependencies] tokio = { version = "1.6", features = [ "macros" ] } diff --git a/pbs-tools/src/async_lru_cache.rs b/pbs-tools/src/async_lru_cache.rs index 315a7862..be4ea891 100644 --- a/pbs-tools/src/async_lru_cache.rs +++ b/pbs-tools/src/async_lru_cache.rs @@ -7,7 +7,7 @@ use std::collections::HashMap; use std::future::Future; use std::sync::{Arc, Mutex}; -use crate::broadcast_future::BroadcastFuture; +use proxmox_async::broadcast_future::BroadcastFuture; use crate::lru_cache::LruCache; /// Interface for asynchronously getting values on cache misses. diff --git a/pbs-tools/src/blocking.rs b/pbs-tools/src/blocking.rs deleted file mode 100644 index f5828dfb..00000000 --- a/pbs-tools/src/blocking.rs +++ /dev/null @@ -1,99 +0,0 @@ -//! Async wrappers for blocking I/O (adding `block_in_place` around channels/readers) - -use std::io::{self, Read}; -use std::pin::Pin; -use std::task::{Context, Poll}; -use std::sync::mpsc::Receiver; - -use futures::stream::Stream; - -use pbs_runtime::block_in_place; - -/// Wrapper struct to convert a Reader into a Stream -pub struct WrappedReaderStream { - reader: R, - buffer: Vec, -} - -impl WrappedReaderStream { - - pub fn new(reader: R) -> Self { - let mut buffer = Vec::with_capacity(64*1024); - unsafe { buffer.set_len(buffer.capacity()); } - Self { reader, buffer } - } -} - -impl Stream for WrappedReaderStream { - type Item = Result, io::Error>; - - fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll> { - let this = self.get_mut(); - match block_in_place(|| this.reader.read(&mut this.buffer)) { - Ok(n) => { - if n == 0 { - // EOF - Poll::Ready(None) - } else { - Poll::Ready(Some(Ok(this.buffer[..n].to_vec()))) - } - } - Err(err) => Poll::Ready(Some(Err(err))), - } - } -} - -/// Wrapper struct to convert a channel Receiver into a Stream -pub struct StdChannelStream(pub Receiver); - -impl Stream for StdChannelStream { - type Item = T; - - fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll> { - match block_in_place(|| self.0.recv()) { - Ok(data) => Poll::Ready(Some(data)), - Err(_) => Poll::Ready(None),// channel closed - } - } -} - -#[cfg(test)] -mod test { - use std::io; - - use anyhow::Error; - use futures::stream::TryStreamExt; - - #[test] - fn test_wrapped_stream_reader() -> Result<(), Error> { - pbs_runtime::main(async { - run_wrapped_stream_reader_test().await - }) - } - - struct DummyReader(usize); - - impl io::Read for DummyReader { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - self.0 += 1; - - if self.0 >= 10 { - return Ok(0); - } - - unsafe { - std::ptr::write_bytes(buf.as_mut_ptr(), 0, buf.len()); - } - - Ok(buf.len()) - } - } - - async fn run_wrapped_stream_reader_test() -> Result<(), Error> { - let mut reader = super::WrappedReaderStream::new(DummyReader(0)); - while let Some(_data) = reader.try_next().await? { - // just waiting - } - Ok(()) - } -} diff --git a/pbs-tools/src/broadcast_future.rs b/pbs-tools/src/broadcast_future.rs deleted file mode 100644 index 7bfd83b7..00000000 --- a/pbs-tools/src/broadcast_future.rs +++ /dev/null @@ -1,180 +0,0 @@ -use std::future::Future; -use std::pin::Pin; -use std::sync::{Arc, Mutex}; - -use anyhow::{format_err, Error}; -use futures::future::{FutureExt, TryFutureExt}; -use tokio::sync::oneshot; - -/// Broadcast results to registered listeners using asnyc oneshot channels -#[derive(Default)] -pub struct BroadcastData { - result: Option>, - listeners: Vec>>, -} - -impl BroadcastData { - - pub fn new() -> Self { - Self { - result: None, - listeners: vec![], - } - } - - pub fn notify_listeners(&mut self, result: Result) { - - self.result = Some(result.clone()); - - loop { - match self.listeners.pop() { - None => { break; }, - Some(ch) => { - match &result { - Ok(result) => { let _ = ch.send(Ok(result.clone())); }, - Err(err) => { let _ = ch.send(Err(format_err!("{}", err))); }, - } - }, - } - } - } - - pub fn listen(&mut self) -> impl Future> { - use futures::future::{ok, Either}; - - match &self.result { - None => {}, - Some(Ok(result)) => return Either::Left(ok(result.clone())), - Some(Err(err)) => return Either::Left(futures::future::err(format_err!("{}", err))), - } - - let (tx, rx) = oneshot::channel::>(); - - self.listeners.push(tx); - - Either::Right(rx - .map(|res| match res { - Ok(Ok(t)) => Ok(t), - Ok(Err(e)) => Err(e), - Err(e) => Err(Error::from(e)), - }) - ) - } -} - -type SourceFuture = Pin> + Send>>; - -struct BroadCastFutureBinding { - broadcast: BroadcastData, - future: Option>, -} - -/// Broadcast future results to registered listeners -pub struct BroadcastFuture { - inner: Arc>>, -} - -impl BroadcastFuture { - /// Create instance for specified source future. - /// - /// The result of the future is sent to all registered listeners. - pub fn new(source: Box> + Send>) -> Self { - let inner = BroadCastFutureBinding { - broadcast: BroadcastData::new(), - future: Some(Pin::from(source)), - }; - Self { inner: Arc::new(Mutex::new(inner)) } - } - - /// Creates a new instance with a oneshot channel as trigger - pub fn new_oneshot() -> (Self, oneshot::Sender>) { - - let (tx, rx) = oneshot::channel::>(); - let rx = rx - .map_err(Error::from) - .and_then(futures::future::ready); - - (Self::new(Box::new(rx)), tx) - } - - fn notify_listeners( - inner: Arc>>, - result: Result, - ) { - let mut data = inner.lock().unwrap(); - data.broadcast.notify_listeners(result); - } - - fn spawn(inner: Arc>>) -> impl Future> { - let mut data = inner.lock().unwrap(); - - if let Some(source) = data.future.take() { - - let inner1 = inner.clone(); - - let task = source.map(move |value| { - match value { - Ok(value) => Self::notify_listeners(inner1, Ok(value)), - Err(err) => Self::notify_listeners(inner1, Err(err.to_string())), - } - }); - tokio::spawn(task); - } - - data.broadcast.listen() - } - - /// Register a listener - pub fn listen(&self) -> impl Future> { - let inner2 = self.inner.clone(); - async move { Self::spawn(inner2).await } - } -} - -#[test] -fn test_broadcast_future() { - use std::sync::atomic::{AtomicUsize, Ordering}; - - static CHECKSUM: AtomicUsize = AtomicUsize::new(0); - - let (sender, trigger) = BroadcastFuture::new_oneshot(); - - let receiver1 = sender.listen() - .map_ok(|res| { - CHECKSUM.fetch_add(res, Ordering::SeqCst); - }) - .map_err(|err| { panic!("got error {}", err); }) - .map(|_| ()); - - let receiver2 = sender.listen() - .map_ok(|res| { - CHECKSUM.fetch_add(res*2, Ordering::SeqCst); - }) - .map_err(|err| { panic!("got error {}", err); }) - .map(|_| ()); - - let rt = tokio::runtime::Runtime::new().unwrap(); - rt.block_on(async move { - let r1 = tokio::spawn(receiver1); - let r2 = tokio::spawn(receiver2); - - trigger.send(Ok(1)).unwrap(); - let _ = r1.await; - let _ = r2.await; - }); - - let result = CHECKSUM.load(Ordering::SeqCst); - - assert_eq!(result, 3); - - // the result stays available until the BroadcastFuture is dropped - rt.block_on(sender.listen() - .map_ok(|res| { - CHECKSUM.fetch_add(res*4, Ordering::SeqCst); - }) - .map_err(|err| { panic!("got error {}", err); }) - .map(|_| ())); - - let result = CHECKSUM.load(Ordering::SeqCst); - assert_eq!(result, 7); -} diff --git a/pbs-tools/src/compression.rs b/pbs-tools/src/compression.rs deleted file mode 100644 index aa1b0b24..00000000 --- a/pbs-tools/src/compression.rs +++ /dev/null @@ -1,194 +0,0 @@ -use std::io; -use std::pin::Pin; -use std::task::{Context, Poll}; - -use anyhow::Error; -use bytes::Bytes; -use flate2::{Compress, Compression, FlushCompress}; -use futures::ready; -use futures::stream::Stream; -use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; - -use proxmox::io_format_err; -use proxmox_io::ByteBuffer; - -const BUFFER_SIZE: usize = 8192; - -pub enum Level { - Fastest, - Best, - Default, - Precise(u32), -} - -#[derive(Eq, PartialEq)] -enum EncoderState { - Reading, - Writing, - Flushing, - Finished, -} - -pub struct DeflateEncoder { - inner: T, - compressor: Compress, - buffer: ByteBuffer, - input_buffer: Bytes, - state: EncoderState, -} - -impl DeflateEncoder { - pub fn new(inner: T) -> Self { - Self::with_quality(inner, Level::Default) - } - - pub fn with_quality(inner: T, level: Level) -> Self { - let level = match level { - Level::Fastest => Compression::fast(), - Level::Best => Compression::best(), - Level::Default => Compression::new(3), - Level::Precise(val) => Compression::new(val), - }; - - Self { - inner, - compressor: Compress::new(level, false), - buffer: ByteBuffer::with_capacity(BUFFER_SIZE), - input_buffer: Bytes::new(), - state: EncoderState::Reading, - } - } - - pub fn total_in(&self) -> u64 { - self.compressor.total_in() - } - - pub fn total_out(&self) -> u64 { - self.compressor.total_out() - } - - pub fn into_inner(self) -> T { - self.inner - } - - fn encode( - &mut self, - inbuf: &[u8], - flush: FlushCompress, - ) -> Result<(usize, flate2::Status), io::Error> { - let old_in = self.compressor.total_in(); - let old_out = self.compressor.total_out(); - let res = self - .compressor - .compress(&inbuf[..], self.buffer.get_free_mut_slice(), flush)?; - let new_in = (self.compressor.total_in() - old_in) as usize; - let new_out = (self.compressor.total_out() - old_out) as usize; - self.buffer.add_size(new_out); - - Ok((new_in, res)) - } -} - -impl DeflateEncoder> { - // assume small files - pub async fn compress_vec(&mut self, reader: &mut R, size_hint: usize) -> Result<(), Error> - where - R: AsyncRead + Unpin, - { - let mut buffer = Vec::with_capacity(size_hint); - reader.read_to_end(&mut buffer).await?; - self.inner.reserve(size_hint); // should be enough since we want smalller files - self.compressor.compress_vec(&buffer[..], &mut self.inner, FlushCompress::Finish)?; - Ok(()) - } -} - -impl DeflateEncoder { - pub async fn compress(&mut self, reader: &mut R) -> Result<(), Error> - where - R: AsyncRead + Unpin, - { - let mut buffer = ByteBuffer::with_capacity(BUFFER_SIZE); - let mut eof = false; - loop { - if !eof && !buffer.is_full() { - let read = buffer.read_from_async(reader).await?; - if read == 0 { - eof = true; - } - } - let (read, _res) = self.encode(&buffer[..], FlushCompress::None)?; - buffer.consume(read); - - self.inner.write_all(&self.buffer[..]).await?; - self.buffer.clear(); - - if buffer.is_empty() && eof { - break; - } - } - - loop { - let (_read, res) = self.encode(&[][..], FlushCompress::Finish)?; - self.inner.write_all(&self.buffer[..]).await?; - self.buffer.clear(); - if res == flate2::Status::StreamEnd { - break; - } - } - - Ok(()) - } -} - -impl Stream for DeflateEncoder -where - T: Stream> + Unpin, - O: Into -{ - type Item = Result; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.get_mut(); - - loop { - match this.state { - EncoderState::Reading => { - if let Some(res) = ready!(Pin::new(&mut this.inner).poll_next(cx)) { - let buf = res?; - this.input_buffer = buf.into(); - this.state = EncoderState::Writing; - } else { - this.state = EncoderState::Flushing; - } - } - EncoderState::Writing => { - if this.input_buffer.is_empty() { - return Poll::Ready(Some(Err(io_format_err!("empty input during write")))); - } - let mut buf = this.input_buffer.split_off(0); - let (read, res) = this.encode(&buf[..], FlushCompress::None)?; - this.input_buffer = buf.split_off(read); - if this.input_buffer.is_empty() { - this.state = EncoderState::Reading; - } - if this.buffer.is_full() || res == flate2::Status::BufError { - let bytes = this.buffer.remove_data(this.buffer.len()).to_vec(); - return Poll::Ready(Some(Ok(bytes.into()))); - } - } - EncoderState::Flushing => { - let (_read, res) = this.encode(&[][..], FlushCompress::Finish)?; - if !this.buffer.is_empty() { - let bytes = this.buffer.remove_data(this.buffer.len()).to_vec(); - return Poll::Ready(Some(Ok(bytes.into()))); - } - if res == flate2::Status::StreamEnd { - this.state = EncoderState::Finished; - } - } - EncoderState::Finished => return Poll::Ready(None), - } - } - } -} diff --git a/pbs-tools/src/lib.rs b/pbs-tools/src/lib.rs index 3b470109..9ea4ea5c 100644 --- a/pbs-tools/src/lib.rs +++ b/pbs-tools/src/lib.rs @@ -1,9 +1,6 @@ pub mod acl; -pub mod blocking; -pub mod broadcast_future; pub mod cert; pub mod cli; -pub mod compression; pub mod crypt_config; pub mod format; pub mod fs; @@ -14,13 +11,10 @@ pub mod nom; pub mod percent_encoding; pub mod sha; pub mod str; -pub mod stream; pub mod sync; pub mod sys; pub mod ticket; -pub mod tokio; pub mod xattr; -pub mod zip; pub mod async_lru_cache; diff --git a/pbs-tools/src/stream.rs b/pbs-tools/src/stream.rs deleted file mode 100644 index cfee01cc..00000000 --- a/pbs-tools/src/stream.rs +++ /dev/null @@ -1,229 +0,0 @@ -//! Wrappers between async readers and streams. - -use std::io::{self, Read}; -use std::future::Future; -use std::pin::Pin; -use std::task::{Context, Poll}; - -use anyhow::{Error, Result}; -use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; -use tokio::sync::mpsc::Sender; -use futures::ready; -use futures::future::FutureExt; -use futures::stream::Stream; - -use proxmox::io_format_err; -use proxmox::sys::error::io_err_other; -use proxmox_io::ByteBuffer; - -use pbs_runtime::block_in_place; - -/// Wrapper struct to convert a Reader into a Stream -pub struct WrappedReaderStream { - reader: R, - buffer: Vec, -} - -impl WrappedReaderStream { - - pub fn new(reader: R) -> Self { - let mut buffer = Vec::with_capacity(64*1024); - unsafe { buffer.set_len(buffer.capacity()); } - Self { reader, buffer } - } -} - -impl Stream for WrappedReaderStream { - type Item = Result, io::Error>; - - fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll> { - let this = self.get_mut(); - match block_in_place(|| this.reader.read(&mut this.buffer)) { - Ok(n) => { - if n == 0 { - // EOF - Poll::Ready(None) - } else { - Poll::Ready(Some(Ok(this.buffer[..n].to_vec()))) - } - } - Err(err) => Poll::Ready(Some(Err(err))), - } - } -} - -/// Wrapper struct to convert an AsyncReader into a Stream -pub struct AsyncReaderStream { - reader: R, - buffer: Vec, -} - -impl AsyncReaderStream { - - pub fn new(reader: R) -> Self { - let mut buffer = Vec::with_capacity(64*1024); - unsafe { buffer.set_len(buffer.capacity()); } - Self { reader, buffer } - } - - pub fn with_buffer_size(reader: R, buffer_size: usize) -> Self { - let mut buffer = Vec::with_capacity(buffer_size); - unsafe { buffer.set_len(buffer.capacity()); } - Self { reader, buffer } - } -} - -impl Stream for AsyncReaderStream { - type Item = Result, io::Error>; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - let this = self.get_mut(); - let mut read_buf = ReadBuf::new(&mut this.buffer); - match ready!(Pin::new(&mut this.reader).poll_read(cx, &mut read_buf)) { - Ok(()) => { - let n = read_buf.filled().len(); - if n == 0 { - // EOF - Poll::Ready(None) - } else { - Poll::Ready(Some(Ok(this.buffer[..n].to_vec()))) - } - } - Err(err) => Poll::Ready(Some(Err(err))), - } - } -} - -#[cfg(test)] -mod test { - use std::io; - - use anyhow::Error; - use futures::stream::TryStreamExt; - - #[test] - fn test_wrapped_stream_reader() -> Result<(), Error> { - pbs_runtime::main(async { - run_wrapped_stream_reader_test().await - }) - } - - struct DummyReader(usize); - - impl io::Read for DummyReader { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - self.0 += 1; - - if self.0 >= 10 { - return Ok(0); - } - - unsafe { - std::ptr::write_bytes(buf.as_mut_ptr(), 0, buf.len()); - } - - Ok(buf.len()) - } - } - - async fn run_wrapped_stream_reader_test() -> Result<(), Error> { - let mut reader = super::WrappedReaderStream::new(DummyReader(0)); - while let Some(_data) = reader.try_next().await? { - // just waiting - } - Ok(()) - } -} - -/// Wrapper around tokio::sync::mpsc::Sender, which implements Write -pub struct AsyncChannelWriter { - sender: Option, Error>>>, - buf: ByteBuffer, - state: WriterState, -} - -type SendResult = io::Result>>>; - -enum WriterState { - Ready, - Sending(Pin + Send + 'static>>), -} - -impl AsyncChannelWriter { - pub fn new(sender: Sender, Error>>, buf_size: usize) -> Self { - Self { - sender: Some(sender), - buf: ByteBuffer::with_capacity(buf_size), - state: WriterState::Ready, - } - } - - fn poll_write_impl( - &mut self, - cx: &mut Context, - buf: &[u8], - flush: bool, - ) -> Poll> { - loop { - match &mut self.state { - WriterState::Ready => { - if flush { - if self.buf.is_empty() { - return Poll::Ready(Ok(0)); - } - } else { - let free_size = self.buf.free_size(); - if free_size > buf.len() || self.buf.is_empty() { - let count = free_size.min(buf.len()); - self.buf.get_free_mut_slice()[..count].copy_from_slice(&buf[..count]); - self.buf.add_size(count); - return Poll::Ready(Ok(count)); - } - } - - let sender = match self.sender.take() { - Some(sender) => sender, - None => return Poll::Ready(Err(io_err_other("no sender"))), - }; - - let data = self.buf.remove_data(self.buf.len()).to_vec(); - let future = async move { - sender - .send(Ok(data)) - .await - .map(move |_| sender) - .map_err(|err| io_format_err!("could not send: {}", err)) - }; - - self.state = WriterState::Sending(future.boxed()); - } - WriterState::Sending(ref mut future) => match ready!(future.as_mut().poll(cx)) { - Ok(sender) => { - self.sender = Some(sender); - self.state = WriterState::Ready; - } - Err(err) => return Poll::Ready(Err(err)), - }, - } - } - } -} - -impl AsyncWrite for AsyncChannelWriter { - fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll> { - let this = self.get_mut(); - this.poll_write_impl(cx, buf, false) - } - - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - let this = self.get_mut(); - match ready!(this.poll_write_impl(cx, &[], true)) { - Ok(_) => Poll::Ready(Ok(())), - Err(err) => Poll::Ready(Err(err)), - } - } - - fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - self.poll_flush(cx) - } -} diff --git a/pbs-tools/src/tokio/mod.rs b/pbs-tools/src/tokio/mod.rs deleted file mode 100644 index 43fc107b..00000000 --- a/pbs-tools/src/tokio/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ -pub mod tokio_writer_adapter; -pub use tokio_writer_adapter::TokioWriterAdapter; diff --git a/pbs-tools/src/tokio/tokio_writer_adapter.rs b/pbs-tools/src/tokio/tokio_writer_adapter.rs deleted file mode 100644 index 7b7f5dcf..00000000 --- a/pbs-tools/src/tokio/tokio_writer_adapter.rs +++ /dev/null @@ -1,26 +0,0 @@ -use std::io::Write; - -use tokio::task::block_in_place; - -/// Wrapper around a writer which implements Write -/// -/// wraps each write with a 'block_in_place' so that -/// any (blocking) writer can be safely used in async context in a -/// tokio runtime -pub struct TokioWriterAdapter(W); - -impl TokioWriterAdapter { - pub fn new(writer: W) -> Self { - Self(writer) - } -} - -impl Write for TokioWriterAdapter { - fn write(&mut self, buf: &[u8]) -> Result { - block_in_place(|| self.0.write(buf)) - } - - fn flush(&mut self) -> Result<(), std::io::Error> { - block_in_place(|| self.0.flush()) - } -} diff --git a/pbs-tools/src/zip.rs b/pbs-tools/src/zip.rs deleted file mode 100644 index 0e85dd25..00000000 --- a/pbs-tools/src/zip.rs +++ /dev/null @@ -1,672 +0,0 @@ -//! ZIP Helper -//! -//! Provides an interface to create a ZIP File from ZipEntries -//! for a more detailed description of the ZIP format, see: -//! https://pkware.cachefly.net/webdocs/casestudies/APPNOTE.TXT - -use std::convert::TryInto; -use std::ffi::OsString; -use std::io; -use std::mem::size_of; -use std::os::unix::ffi::OsStrExt; -use std::path::{Component, Path, PathBuf}; -use std::pin::Pin; -use std::task::{Context, Poll}; -use std::time::SystemTime; - -use anyhow::{format_err, Error, Result}; -use endian_trait::Endian; -use futures::ready; -use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, ReadBuf}; - -use crc32fast::Hasher; -use proxmox_time::gmtime; - -use crate::compression::{DeflateEncoder, Level}; - -const LOCAL_FH_SIG: u32 = 0x04034B50; -const LOCAL_FF_SIG: u32 = 0x08074B50; -const CENTRAL_DIRECTORY_FH_SIG: u32 = 0x02014B50; -const END_OF_CENTRAL_DIR: u32 = 0x06054B50; -const VERSION_NEEDED: u16 = 0x002d; -const VERSION_MADE_BY: u16 = 0x032d; - -const ZIP64_EOCD_RECORD: u32 = 0x06064B50; -const ZIP64_EOCD_LOCATOR: u32 = 0x07064B50; - -// bits for time: -// 0-4: day of the month (1-31) -// 5-8: month: (1 = jan, etc.) -// 9-15: year offset from 1980 -// -// bits for date: -// 0-4: second / 2 -// 5-10: minute (0-59) -// 11-15: hour (0-23) -// -// see https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-filetimetodosdatetime -fn epoch_to_dos(epoch: i64) -> (u16, u16) { - let gmtime = match gmtime(epoch) { - Ok(gmtime) => gmtime, - Err(_) => return (0, 0), - }; - - let seconds = (gmtime.tm_sec / 2) & 0b11111; - let minutes = gmtime.tm_min & 0xb111111; - let hours = gmtime.tm_hour & 0b11111; - let time: u16 = ((hours << 11) | (minutes << 5) | (seconds)) as u16; - - let date: u16 = if gmtime.tm_year > (2108 - 1900) || gmtime.tm_year < (1980 - 1900) { - 0 - } else { - let day = gmtime.tm_mday & 0b11111; - let month = (gmtime.tm_mon + 1) & 0b1111; - let year = (gmtime.tm_year + 1900 - 1980) & 0b1111111; - ((year << 9) | (month << 5) | (day)) as u16 - }; - - (date, time) -} - -#[derive(Endian)] -#[repr(C, packed)] -struct Zip64Field { - field_type: u16, - field_size: u16, - uncompressed_size: u64, - compressed_size: u64, -} - -#[derive(Endian)] -#[repr(C, packed)] -struct Zip64FieldWithOffset { - field_type: u16, - field_size: u16, - uncompressed_size: u64, - compressed_size: u64, - offset: u64, - start_disk: u32, -} - -#[derive(Endian)] -#[repr(C, packed)] -struct LocalFileHeader { - signature: u32, - version_needed: u16, - flags: u16, - compression: u16, - time: u16, - date: u16, - crc32: u32, - compressed_size: u32, - uncompressed_size: u32, - filename_len: u16, - extra_field_len: u16, -} - -#[derive(Endian)] -#[repr(C, packed)] -struct LocalFileFooter { - signature: u32, - crc32: u32, - compressed_size: u64, - uncompressed_size: u64, -} - -#[derive(Endian)] -#[repr(C, packed)] -struct CentralDirectoryFileHeader { - signature: u32, - version_made_by: u16, - version_needed: u16, - flags: u16, - compression: u16, - time: u16, - date: u16, - crc32: u32, - compressed_size: u32, - uncompressed_size: u32, - filename_len: u16, - extra_field_len: u16, - comment_len: u16, - start_disk: u16, - internal_flags: u16, - external_flags: u32, - offset: u32, -} - -#[derive(Endian)] -#[repr(C, packed)] -struct EndOfCentralDir { - signature: u32, - disk_number: u16, - start_disk: u16, - disk_record_count: u16, - total_record_count: u16, - directory_size: u32, - directory_offset: u32, - comment_len: u16, -} - -#[derive(Endian)] -#[repr(C, packed)] -struct Zip64EOCDRecord { - signature: u32, - field_size: u64, - version_made_by: u16, - version_needed: u16, - disk_number: u32, - disk_number_central_dir: u32, - disk_record_count: u64, - total_record_count: u64, - directory_size: u64, - directory_offset: u64, -} - -#[derive(Endian)] -#[repr(C, packed)] -struct Zip64EOCDLocator { - signature: u32, - disk_number: u32, - offset: u64, - disk_count: u32, -} - -async fn write_struct(output: &mut T, data: E) -> io::Result<()> -where - T: AsyncWrite + ?Sized + Unpin, - E: Endian, -{ - let data = data.to_le(); - - let data = unsafe { - std::slice::from_raw_parts( - &data as *const E as *const u8, - core::mem::size_of_val(&data), - ) - }; - output.write_all(data).await -} - -/// Represents an Entry in a ZIP File -/// -/// used to add to a ZipEncoder -pub struct ZipEntry { - filename: OsString, - mtime: i64, - mode: u16, - crc32: u32, - uncompressed_size: u64, - compressed_size: u64, - offset: u64, - is_file: bool, -} - -impl ZipEntry { - /// Creates a new ZipEntry - /// - /// if is_file is false the path will contain an trailing separator, - /// so that the zip file understands that it is a directory - pub fn new>(path: P, mtime: i64, mode: u16, is_file: bool) -> Self { - let mut relpath = PathBuf::new(); - - for comp in path.as_ref().components() { - if let Component::Normal(_) = comp { - relpath.push(comp); - } - } - - if !is_file { - relpath.push(""); // adds trailing slash - } - - Self { - filename: relpath.into(), - crc32: 0, - mtime, - mode, - uncompressed_size: 0, - compressed_size: 0, - offset: 0, - is_file, - } - } - - async fn write_local_header(&self, mut buf: &mut W) -> io::Result - where - W: AsyncWrite + Unpin + ?Sized, - { - let filename = self.filename.as_bytes(); - let filename_len = filename.len(); - let header_size = size_of::(); - let zip_field_size = size_of::(); - let size: usize = header_size + filename_len + zip_field_size; - - let (date, time) = epoch_to_dos(self.mtime); - - write_struct( - &mut buf, - LocalFileHeader { - signature: LOCAL_FH_SIG, - version_needed: 0x2d, - flags: 1 << 3, - compression: 0x8, - time, - date, - crc32: 0, - compressed_size: 0xFFFFFFFF, - uncompressed_size: 0xFFFFFFFF, - filename_len: filename_len as u16, - extra_field_len: zip_field_size as u16, - }, - ) - .await?; - - buf.write_all(filename).await?; - - write_struct( - &mut buf, - Zip64Field { - field_type: 0x0001, - field_size: 2 * 8, - uncompressed_size: 0, - compressed_size: 0, - }, - ) - .await?; - - Ok(size) - } - - async fn write_data_descriptor( - &self, - mut buf: &mut W, - ) -> io::Result { - let size = size_of::(); - - write_struct( - &mut buf, - LocalFileFooter { - signature: LOCAL_FF_SIG, - crc32: self.crc32, - compressed_size: self.compressed_size, - uncompressed_size: self.uncompressed_size, - }, - ) - .await?; - - Ok(size) - } - - async fn write_central_directory_header( - &self, - mut buf: &mut W, - ) -> io::Result { - let filename = self.filename.as_bytes(); - let filename_len = filename.len(); - let header_size = size_of::(); - let zip_field_size = size_of::(); - let mut size: usize = header_size + filename_len; - - let (date, time) = epoch_to_dos(self.mtime); - - let (compressed_size, uncompressed_size, offset, need_zip64) = if self.compressed_size - >= (u32::MAX as u64) - || self.uncompressed_size >= (u32::MAX as u64) - || self.offset >= (u32::MAX as u64) - { - size += zip_field_size; - (0xFFFFFFFF, 0xFFFFFFFF, 0xFFFFFFFF, true) - } else { - ( - self.compressed_size as u32, - self.uncompressed_size as u32, - self.offset as u32, - false, - ) - }; - - write_struct( - &mut buf, - CentralDirectoryFileHeader { - signature: CENTRAL_DIRECTORY_FH_SIG, - version_made_by: VERSION_MADE_BY, - version_needed: VERSION_NEEDED, - flags: 1 << 3, - compression: 0x8, - time, - date, - crc32: self.crc32, - compressed_size, - uncompressed_size, - filename_len: filename_len as u16, - extra_field_len: if need_zip64 { zip_field_size as u16 } else { 0 }, - comment_len: 0, - start_disk: 0, - internal_flags: 0, - external_flags: (self.mode as u32) << 16 | (!self.is_file as u32) << 4, - offset, - }, - ) - .await?; - - buf.write_all(filename).await?; - - if need_zip64 { - write_struct( - &mut buf, - Zip64FieldWithOffset { - field_type: 1, - field_size: 3 * 8 + 4, - uncompressed_size: self.uncompressed_size, - compressed_size: self.compressed_size, - offset: self.offset, - start_disk: 0, - }, - ) - .await?; - } - - Ok(size) - } -} - -// wraps an asyncreader and calculates the hash -struct HashWrapper { - inner: R, - hasher: Hasher, -} - -impl HashWrapper { - fn new(inner: R) -> Self { - Self { - inner, - hasher: Hasher::new(), - } - } - - // consumes self and returns the hash and the reader - fn finish(self) -> (u32, R) { - let crc32 = self.hasher.finalize(); - (crc32, self.inner) - } -} - -impl AsyncRead for HashWrapper -where - R: AsyncRead + Unpin, -{ - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut ReadBuf<'_>, - ) -> Poll> { - let this = self.get_mut(); - let old_len = buf.filled().len(); - ready!(Pin::new(&mut this.inner).poll_read(cx, buf))?; - let new_len = buf.filled().len(); - if new_len > old_len { - this.hasher.update(&buf.filled()[old_len..new_len]); - } - Poll::Ready(Ok(())) - } -} - -/// Wraps a writer that implements AsyncWrite for creating a ZIP archive -/// -/// This will create a ZIP archive on the fly with files added with -/// 'add_entry'. To Finish the file, call 'finish' -/// Example: -/// ```no_run -/// use anyhow::{Error, Result}; -/// use tokio::fs::File; -/// -/// use pbs_tools::zip::{ZipEncoder, ZipEntry}; -/// -/// #[tokio::main] -/// async fn main() -> Result<(), Error> { -/// let target = File::open("foo.zip").await?; -/// let mut source = File::open("foo.txt").await?; -/// -/// let mut zip = ZipEncoder::new(target); -/// zip.add_entry(ZipEntry::new( -/// "foo.txt", -/// 0, -/// 0o100755, -/// true, -/// ), Some(source)).await?; -/// -/// zip.finish().await?; -/// -/// Ok(()) -/// } -/// ``` -pub struct ZipEncoder -where - W: AsyncWrite + Unpin, -{ - byte_count: usize, - files: Vec, - target: Option, -} - -impl ZipEncoder { - pub fn new(target: W) -> Self { - Self { - byte_count: 0, - files: Vec::new(), - target: Some(target), - } - } - - pub async fn add_entry( - &mut self, - mut entry: ZipEntry, - content: Option, - ) -> Result<(), Error> { - let mut target = self - .target - .take() - .ok_or_else(|| format_err!("had no target during add entry"))?; - entry.offset = self.byte_count.try_into()?; - self.byte_count += entry.write_local_header(&mut target).await?; - if let Some(content) = content { - let mut reader = HashWrapper::new(content); - let mut enc = DeflateEncoder::with_quality(target, Level::Fastest); - - enc.compress(&mut reader).await?; - let total_in = enc.total_in(); - let total_out = enc.total_out(); - target = enc.into_inner(); - - let (crc32, _reader) = reader.finish(); - - self.byte_count += total_out as usize; - entry.compressed_size = total_out; - entry.uncompressed_size = total_in; - - entry.crc32 = crc32; - } - self.byte_count += entry.write_data_descriptor(&mut target).await?; - self.target = Some(target); - - self.files.push(entry); - - Ok(()) - } - - async fn write_eocd( - &mut self, - central_dir_size: usize, - central_dir_offset: usize, - ) -> Result<(), Error> { - let entrycount = self.files.len(); - let mut target = self - .target - .take() - .ok_or_else(|| format_err!("had no target during write_eocd"))?; - - let mut count = entrycount as u16; - let mut directory_size = central_dir_size as u32; - let mut directory_offset = central_dir_offset as u32; - - if central_dir_size > u32::MAX as usize - || central_dir_offset > u32::MAX as usize - || entrycount > u16::MAX as usize - { - count = 0xFFFF; - directory_size = 0xFFFFFFFF; - directory_offset = 0xFFFFFFFF; - - write_struct( - &mut target, - Zip64EOCDRecord { - signature: ZIP64_EOCD_RECORD, - field_size: 44, - version_made_by: VERSION_MADE_BY, - version_needed: VERSION_NEEDED, - disk_number: 0, - disk_number_central_dir: 0, - disk_record_count: entrycount.try_into()?, - total_record_count: entrycount.try_into()?, - directory_size: central_dir_size.try_into()?, - directory_offset: central_dir_offset.try_into()?, - }, - ) - .await?; - - let locator_offset = central_dir_offset + central_dir_size; - - write_struct( - &mut target, - Zip64EOCDLocator { - signature: ZIP64_EOCD_LOCATOR, - disk_number: 0, - offset: locator_offset.try_into()?, - disk_count: 1, - }, - ) - .await?; - } - - write_struct( - &mut target, - EndOfCentralDir { - signature: END_OF_CENTRAL_DIR, - disk_number: 0, - start_disk: 0, - disk_record_count: count, - total_record_count: count, - directory_size, - directory_offset, - comment_len: 0, - }, - ) - .await?; - - self.target = Some(target); - - Ok(()) - } - - pub async fn finish(&mut self) -> Result<(), Error> { - let mut target = self - .target - .take() - .ok_or_else(|| format_err!("had no target during finish"))?; - let central_dir_offset = self.byte_count; - let mut central_dir_size = 0; - - for file in &self.files { - central_dir_size += file.write_central_directory_header(&mut target).await?; - } - - self.target = Some(target); - self.write_eocd(central_dir_size, central_dir_offset) - .await?; - - self.target - .take() - .ok_or_else(|| format_err!("had no target for flush"))? - .flush() - .await?; - - Ok(()) - } -} - -/// Zip a local directory and write encoded data to target. "source" has to point to a valid -/// directory, it's name will be the root of the zip file - e.g.: -/// source: -/// /foo/bar -/// zip file: -/// /bar/file1 -/// /bar/dir1 -/// /bar/dir1/file2 -/// ... -/// ...except if "source" is the root directory -pub async fn zip_directory(target: W, source: &Path) -> Result<(), Error> -where - W: AsyncWrite + Unpin + Send, -{ - use walkdir::WalkDir; - use std::os::unix::fs::MetadataExt; - - let base_path = source.parent().unwrap_or_else(|| Path::new("/")); - let mut encoder = ZipEncoder::new(target); - - for entry in WalkDir::new(&source).into_iter() { - match entry { - Ok(entry) => { - let entry_path = entry.path().to_owned(); - let encoder = &mut encoder; - - if let Err(err) = async move { - let entry_path_no_base = entry.path().strip_prefix(base_path)?; - let metadata = entry.metadata()?; - let mtime = match metadata.modified().unwrap_or_else(|_| SystemTime::now()).duration_since(SystemTime::UNIX_EPOCH) { - Ok(dur) => dur.as_secs() as i64, - Err(time_error) => -(time_error.duration().as_secs() as i64) - }; - let mode = metadata.mode() as u16; - - if entry.file_type().is_file() { - let file = tokio::fs::File::open(entry.path()).await?; - let ze = ZipEntry::new( - &entry_path_no_base, - mtime, - mode, - true, - ); - encoder.add_entry(ze, Some(file)).await?; - } else if entry.file_type().is_dir() { - let ze = ZipEntry::new( - &entry_path_no_base, - mtime, - mode, - false, - ); - let content: Option = None; - encoder.add_entry(ze, content).await?; - } - // ignore other file types - let ok: Result<(), Error> = Ok(()); - ok - } - .await - { - eprintln!( - "zip: error encoding file or directory '{}': {}", - entry_path.display(), - err - ); - } - } - Err(err) => { - eprintln!("zip: error reading directory entry: {}", err); - } - } - } - - encoder.finish().await -} diff --git a/proxmox-backup-client/Cargo.toml b/proxmox-backup-client/Cargo.toml index d83c07a9..6b0348c0 100644 --- a/proxmox-backup-client/Cargo.toml +++ b/proxmox-backup-client/Cargo.toml @@ -23,6 +23,7 @@ pathpatterns = "0.1.2" pxar = { version = "0.10.1", features = [ "tokio-io" ] } proxmox = { version = "0.15.3", features = [ "sortable-macro" ] } +proxmox-async = "0.1" proxmox-router = { version = "1.1", features = [ "cli" ] } proxmox-schema = { version = "1", features = [ "api-macro" ] } proxmox-time = "1" @@ -33,5 +34,4 @@ pbs-config = { path = "../pbs-config" } pbs-client = { path = "../pbs-client" } pbs-datastore = { path = "../pbs-datastore" } pbs-fuse-loop = { path = "../pbs-fuse-loop" } -pbs-runtime = { path = "../pbs-runtime" } pbs-tools = { path = "../pbs-tools" } diff --git a/proxmox-backup-client/src/main.rs b/proxmox-backup-client/src/main.rs index c5189e04..ec479faa 100644 --- a/proxmox-backup-client/src/main.rs +++ b/proxmox-backup-client/src/main.rs @@ -17,6 +17,7 @@ use proxmox::tools::fs::{file_get_json, replace_file, CreateOptions, image_size} use proxmox_router::{ApiMethod, RpcEnvironment, cli::*}; use proxmox_schema::api; use proxmox_time::{strftime_local, epoch_i64}; +use proxmox_async::tokio_writer_adapter::TokioWriterAdapter; use pxar::accessor::{MaybeReady, ReadAt, ReadAtOperation}; use pbs_api_types::{ @@ -67,7 +68,6 @@ use pbs_datastore::manifest::{ }; use pbs_datastore::read_chunk::AsyncReadChunk; use pbs_tools::sync::StdChannelWriter; -use pbs_tools::tokio::TokioWriterAdapter; use pbs_tools::json; use pbs_tools::crypt_config::CryptConfig; @@ -486,7 +486,7 @@ fn spawn_catalog_upload( encrypt: bool, ) -> Result { let (catalog_tx, catalog_rx) = std::sync::mpsc::sync_channel(10); // allow to buffer 10 writes - let catalog_stream = pbs_tools::blocking::StdChannelStream(catalog_rx); + let catalog_stream = proxmox_async::blocking::StdChannelStream(catalog_rx); let catalog_chunk_size = 512*1024; let catalog_chunk_stream = ChunkStream::new(catalog_stream, Some(catalog_chunk_size)); @@ -1524,6 +1524,6 @@ fn main() { let rpcenv = CliEnvironment::new(); run_cli_command(cmd_def, rpcenv, Some(|future| { - pbs_runtime::main(future) + proxmox_async::runtime::main(future) })); } diff --git a/proxmox-backup-client/src/mount.rs b/proxmox-backup-client/src/mount.rs index 265622f3..f0759d1b 100644 --- a/proxmox-backup-client/src/mount.rs +++ b/proxmox-backup-client/src/mount.rs @@ -135,7 +135,7 @@ fn mount( if verbose { // This will stay in foreground with debug output enabled as None is // passed for the RawFd. - return pbs_runtime::main(mount_do(param, None)); + return proxmox_async::runtime::main(mount_do(param, None)); } // Process should be daemonized. @@ -151,7 +151,7 @@ fn mount( Ok(ForkResult::Child) => { drop(pr); nix::unistd::setsid().unwrap(); - pbs_runtime::main(mount_do(param, Some(pw))) + proxmox_async::runtime::main(mount_do(param, Some(pw))) } Err(_) => bail!("failed to daemonize process"), } diff --git a/proxmox-file-restore/Cargo.toml b/proxmox-file-restore/Cargo.toml index a48ca0c3..bbd66705 100644 --- a/proxmox-file-restore/Cargo.toml +++ b/proxmox-file-restore/Cargo.toml @@ -17,6 +17,7 @@ tokio = { version = "1.6", features = [ "io-std", "rt", "rt-multi-thread", "time pxar = { version = "0.10.1", features = [ "tokio-io" ] } proxmox = { version = "0.15.3" } +proxmox-async = "0.1" proxmox-lang = "1" proxmox-router = { version = "1.1", features = [ "cli" ] } proxmox-schema = { version = "1", features = [ "api-macro" ] } @@ -29,5 +30,4 @@ pbs-buildcfg = { path = "../pbs-buildcfg" } pbs-config = { path = "../pbs-config" } pbs-client = { path = "../pbs-client" } pbs-datastore = { path = "../pbs-datastore" } -pbs-runtime = { path = "../pbs-runtime" } pbs-tools = { path = "../pbs-tools" } diff --git a/proxmox-file-restore/src/main.rs b/proxmox-file-restore/src/main.rs index dc507dc1..c266d014 100644 --- a/proxmox-file-restore/src/main.rs +++ b/proxmox-file-restore/src/main.rs @@ -478,7 +478,7 @@ fn main() { run_cli_command( cmd_def, rpcenv, - Some(|future| pbs_runtime::main(future)), + Some(|future| proxmox_async::runtime::main(future)), ); } diff --git a/proxmox-rest-server/Cargo.toml b/proxmox-rest-server/Cargo.toml index 71afbc8f..a6b51a65 100644 --- a/proxmox-rest-server/Cargo.toml +++ b/proxmox-rest-server/Cargo.toml @@ -31,6 +31,7 @@ tower-service = "0.3.0" url = "2.1" proxmox = "0.15.3" +proxmox-async = "0.1" proxmox-io = "1" proxmox-lang = "1" proxmox-http = { version = "0.5.0", features = [ "client" ] } diff --git a/proxmox-rest-server/src/rest.rs b/proxmox-rest-server/src/rest.rs index f27f703d..3a091026 100644 --- a/proxmox-rest-server/src/rest.rs +++ b/proxmox-rest-server/src/rest.rs @@ -33,8 +33,8 @@ use proxmox_schema::{ use proxmox_http::client::RateLimitedStream; -use pbs_tools::compression::{DeflateEncoder, Level}; -use pbs_tools::stream::AsyncReaderStream; +use proxmox_async::compression::{DeflateEncoder, Level}; +use proxmox_async::stream::AsyncReaderStream; use crate::{ ApiConfig, FileLogger, AuthError, RestEnvironment, CompressionMethod, diff --git a/proxmox-rest-server/src/state.rs b/proxmox-rest-server/src/state.rs index 955e3ce3..e4234c76 100644 --- a/proxmox-rest-server/src/state.rs +++ b/proxmox-rest-server/src/state.rs @@ -6,7 +6,7 @@ use futures::*; use tokio::signal::unix::{signal, SignalKind}; -use pbs_tools::broadcast_future::BroadcastData; +use proxmox_async::broadcast_future::BroadcastData; use crate::request_shutdown; diff --git a/proxmox-restore-daemon/Cargo.toml b/proxmox-restore-daemon/Cargo.toml index 30800e38..c7ad9f04 100644 --- a/proxmox-restore-daemon/Cargo.toml +++ b/proxmox-restore-daemon/Cargo.toml @@ -27,12 +27,12 @@ pathpatterns = "0.1.2" pxar = { version = "0.10.1", features = [ "tokio-io" ] } proxmox = { version = "0.15.3", features = [ "sortable-macro" ] } +proxmox-async = "0.1" proxmox-router = { version = "1.1", features = [ "cli" ] } proxmox-schema = { version = "1", features = [ "api-macro" ] } proxmox-time = "1" pbs-api-types = { path = "../pbs-api-types" } -pbs-runtime = { path = "../pbs-runtime" } pbs-tools = { path = "../pbs-tools" } pbs-datastore = { path = "../pbs-datastore" } proxmox-rest-server = { path = "../proxmox-rest-server" } diff --git a/proxmox-restore-daemon/src/main.rs b/proxmox-restore-daemon/src/main.rs index a1ad6005..5a4a324e 100644 --- a/proxmox-restore-daemon/src/main.rs +++ b/proxmox-restore-daemon/src/main.rs @@ -63,7 +63,7 @@ fn main() -> Result<(), Error> { info!("disk scan complete, starting main runtime..."); - pbs_runtime::main(run()) + proxmox_async::runtime::main(run()) } /// ensure we have our /run dirs, system users and stuff like that setup diff --git a/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs b/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs index 429ba359..129a18fa 100644 --- a/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs +++ b/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs @@ -19,13 +19,13 @@ use proxmox_router::{ ApiHandler, ApiMethod, ApiResponseFuture, Permission, Router, RpcEnvironment, SubdirMap, }; use proxmox_schema::*; +use proxmox_async::zip::zip_directory; use pbs_api_types::file_restore::RestoreDaemonStatus; use pbs_client::pxar::{create_archive, Flags, PxarCreateOptions, ENCODER_MAX_ENTRIES}; use pbs_datastore::catalog::{ArchiveEntry, DirEntryAttribute}; use pbs_tools::fs::read_subdir; use pbs_tools::json::required_string_param; -use pbs_tools::zip::zip_directory; use pxar::encoder::aio::TokioWriter; diff --git a/pxar-bin/Cargo.toml b/pxar-bin/Cargo.toml index d44a3ffd..732b3262 100644 --- a/pxar-bin/Cargo.toml +++ b/pxar-bin/Cargo.toml @@ -17,10 +17,10 @@ tokio = { version = "1.6", features = [ "rt", "rt-multi-thread" ] } pathpatterns = "0.1.2" proxmox = "0.15.3" +proxmox-async = "0.1" proxmox-schema = { version = "1", features = [ "api-macro" ] } proxmox-router = "1.1" pxar = { version = "0.10.1", features = [ "tokio-io" ] } pbs-client = { path = "../pbs-client" } -pbs-runtime = { path = "../pbs-runtime" } pbs-tools = { path = "../pbs-tools" } diff --git a/pxar-bin/src/main.rs b/pxar-bin/src/main.rs index 6f60494d..cb31bf75 100644 --- a/pxar-bin/src/main.rs +++ b/pxar-bin/src/main.rs @@ -488,6 +488,6 @@ fn main() { let rpcenv = CliEnvironment::new(); run_cli_command(cmd_def, rpcenv, Some(|future| { - pbs_runtime::main(future) + proxmox_async::runtime::main(future) })); } diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs index 2ebe18c5..e95656ce 100644 --- a/src/api2/admin/datastore.rs +++ b/src/api2/admin/datastore.rs @@ -22,6 +22,8 @@ use proxmox_router::{ }; use proxmox_schema::*; use proxmox_sys::{task_log, task_warn}; +use proxmox_async::blocking::WrappedReaderStream; +use proxmox_async::stream::{AsyncReaderStream, AsyncChannelWriter}; use pxar::accessor::aio::Accessor; use pxar::EntryKind; @@ -53,8 +55,6 @@ use pbs_datastore::fixed_index::{FixedIndexReader}; use pbs_datastore::index::IndexFile; use pbs_datastore::manifest::{BackupManifest, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME}; use pbs_datastore::prune::compute_prune_info; -use pbs_tools::blocking::WrappedReaderStream; -use pbs_tools::stream::{AsyncReaderStream, AsyncChannelWriter}; use pbs_tools::json::{required_integer_param, required_string_param}; use pbs_config::CachedUserInfo; use proxmox_rest_server::{WorkerTask, formatter}; diff --git a/src/api2/backup/mod.rs b/src/api2/backup/mod.rs index eea62500..3a902e7a 100644 --- a/src/api2/backup/mod.rs +++ b/src/api2/backup/mod.rs @@ -226,7 +226,7 @@ async move { }; if benchmark { env.log("benchmark finished successfully"); - pbs_runtime::block_in_place(|| env.remove_backup())?; + proxmox_async::runtime::block_in_place(|| env.remove_backup())?; return Ok(()); } @@ -254,13 +254,13 @@ async move { (Ok(_), Err(err)) => { env.log(format!("backup ended and finish failed: {}", err)); env.log("removing unfinished backup"); - pbs_runtime::block_in_place(|| env.remove_backup())?; + proxmox_async::runtime::block_in_place(|| env.remove_backup())?; Err(err) }, (Err(err), Err(_)) => { env.log(format!("backup failed: {}", err)); env.log("removing failed backup"); - pbs_runtime::block_in_place(|| env.remove_backup())?; + proxmox_async::runtime::block_in_place(|| env.remove_backup())?; Err(err) }, } diff --git a/src/api2/backup/upload_chunk.rs b/src/api2/backup/upload_chunk.rs index 75cb44e1..604a4ede 100644 --- a/src/api2/backup/upload_chunk.rs +++ b/src/api2/backup/upload_chunk.rs @@ -62,7 +62,7 @@ impl Future for UploadChunk { let (is_duplicate, compressed_size) = match proxmox_lang::try_block! { let mut chunk = DataBlob::from_raw(raw_data)?; - pbs_runtime::block_in_place(|| { + proxmox_async::runtime::block_in_place(|| { chunk.verify_unencrypted(this.size as usize, &this.digest)?; // always comput CRC at server side diff --git a/src/api2/node/apt.rs b/src/api2/node/apt.rs index a31e73a1..564748a9 100644 --- a/src/api2/node/apt.rs +++ b/src/api2/node/apt.rs @@ -244,7 +244,7 @@ fn apt_get_changelog( let changelog_url = &pkg_info[0].change_log_url; // FIXME: use 'apt-get changelog' for proxmox packages as well, once repo supports it if changelog_url.starts_with("http://download.proxmox.com/") { - let changelog = pbs_runtime::block_on(client.get_string(changelog_url, None)) + let changelog = proxmox_async::runtime::block_on(client.get_string(changelog_url, None)) .map_err(|err| format_err!("Error downloading changelog from '{}': {}", changelog_url, err))?; Ok(json!(changelog)) @@ -268,7 +268,7 @@ fn apt_get_changelog( auth_header.insert("Authorization".to_owned(), format!("Basic {}", base64::encode(format!("{}:{}", key, id)))); - let changelog = pbs_runtime::block_on(client.get_string(changelog_url, Some(&auth_header))) + let changelog = proxmox_async::runtime::block_on(client.get_string(changelog_url, Some(&auth_header))) .map_err(|err| format_err!("Error downloading changelog from '{}': {}", changelog_url, err))?; Ok(json!(changelog)) diff --git a/src/api2/reader/mod.rs b/src/api2/reader/mod.rs index f418f50f..169ba613 100644 --- a/src/api2/reader/mod.rs +++ b/src/api2/reader/mod.rs @@ -289,7 +289,7 @@ fn download_chunk( env.debug(format!("download chunk {:?}", path)); - let data = pbs_runtime::block_in_place(|| std::fs::read(path)) + let data = proxmox_async::runtime::block_in_place(|| std::fs::read(path)) .map_err(move |err| http_err!(BAD_REQUEST, "reading file {:?} failed: {}", path2, err))?; let body = Body::from(data); diff --git a/src/bin/proxmox-backup-api.rs b/src/bin/proxmox-backup-api.rs index 9eb20269..c5afe9dc 100644 --- a/src/bin/proxmox-backup-api.rs +++ b/src/bin/proxmox-backup-api.rs @@ -21,7 +21,7 @@ use proxmox_backup::config; fn main() { proxmox_backup::tools::setup_safe_path_env(); - if let Err(err) = pbs_runtime::main(run()) { + if let Err(err) = proxmox_async::runtime::main(run()) { eprintln!("Error: {}", err); std::process::exit(-1); } diff --git a/src/bin/proxmox-backup-debug.rs b/src/bin/proxmox-backup-debug.rs index 35ef320a..33ac4d50 100644 --- a/src/bin/proxmox-backup-debug.rs +++ b/src/bin/proxmox-backup-debug.rs @@ -20,5 +20,5 @@ fn main() { let mut rpcenv = CliEnvironment::new(); rpcenv.set_auth_id(Some(format!("{}@pam", username))); - run_cli_command(cmd_def, rpcenv, Some(|future| pbs_runtime::main(future))); + run_cli_command(cmd_def, rpcenv, Some(|future| proxmox_async::runtime::main(future))); } diff --git a/src/bin/proxmox-backup-manager.rs b/src/bin/proxmox-backup-manager.rs index eea262c5..fa075e2e 100644 --- a/src/bin/proxmox-backup-manager.rs +++ b/src/bin/proxmox-backup-manager.rs @@ -440,7 +440,7 @@ fn main() -> Result<(), Error> { proxmox_backup::tools::setup_safe_path_env(); - pbs_runtime::main(run()) + proxmox_async::runtime::main(run()) } fn get_sync_job(id: &String) -> Result { @@ -499,7 +499,7 @@ pub fn complete_remote_datastore_name(_arg: &str, param: &HashMap Result<(), Error> { bail!("proxy not running as backup user or group (got uid {} gid {})", running_uid, running_gid); } - pbs_runtime::main(run()) + proxmox_async::runtime::main(run()) } @@ -845,7 +845,7 @@ async fn schedule_task_log_rotate() { if logrotate.rotate(max_size)? { println!("rotated access log, telling daemons to re-open log file"); - pbs_runtime::block_on(command_reopen_access_logfiles())?; + proxmox_async::runtime::block_on(command_reopen_access_logfiles())?; task_log!(worker, "API access log was rotated"); } else { task_log!(worker, "API access log was not rotated"); @@ -860,7 +860,7 @@ async fn schedule_task_log_rotate() { if logrotate.rotate(max_size)? { println!("rotated auth log, telling daemons to re-open log file"); - pbs_runtime::block_on(command_reopen_auth_logfiles())?; + proxmox_async::runtime::block_on(command_reopen_auth_logfiles())?; task_log!(worker, "API authentication log was rotated"); } else { task_log!(worker, "API authentication log was not rotated"); diff --git a/src/bin/proxmox-daily-update.rs b/src/bin/proxmox-daily-update.rs index 20745375..03e2ec02 100644 --- a/src/bin/proxmox-daily-update.rs +++ b/src/bin/proxmox-daily-update.rs @@ -104,7 +104,7 @@ fn main() { let mut rpcenv = CliEnvironment::new(); rpcenv.set_auth_id(Some(String::from("root@pam"))); - if let Err(err) = pbs_runtime::main(run(&mut rpcenv)) { + if let Err(err) = proxmox_async::runtime::main(run(&mut rpcenv)) { eprintln!("error during update: {}", err); std::process::exit(1); } diff --git a/src/bin/proxmox-tape.rs b/src/bin/proxmox-tape.rs index f49857dd..25a39601 100644 --- a/src/bin/proxmox-tape.rs +++ b/src/bin/proxmox-tape.rs @@ -74,7 +74,7 @@ pub fn complete_datastore_group_filter(_arg: &str, param: &HashMap { |complete_me: &str, _map: &HashMap| { - pbs_runtime::block_on(async { complete_api_path_do(complete_me, $capability).await }) + proxmox_async::runtime::block_on(async { complete_api_path_do(complete_me, $capability).await }) } }; } diff --git a/src/server/pull.rs b/src/server/pull.rs index c83a626e..97eee1e6 100644 --- a/src/server/pull.rs +++ b/src/server/pull.rs @@ -124,7 +124,7 @@ async fn pull_index_chunks( let verify_and_write_channel = verify_and_write_channel.clone(); Ok::<_, Error>(async move { - let chunk_exists = pbs_runtime::block_in_place(|| { + let chunk_exists = proxmox_async::runtime::block_in_place(|| { target.cond_touch_chunk(&info.digest, false) })?; if chunk_exists { @@ -136,7 +136,7 @@ async fn pull_index_chunks( let raw_size = chunk.raw_size() as usize; // decode, verify and write in a separate threads to maximize throughput - pbs_runtime::block_in_place(|| { + proxmox_async::runtime::block_in_place(|| { verify_and_write_channel.send((chunk, info.digest, info.size())) })?; diff --git a/src/tools/subscription.rs b/src/tools/subscription.rs index 5b1e6b63..85a37ce2 100644 --- a/src/tools/subscription.rs +++ b/src/tools/subscription.rs @@ -231,7 +231,7 @@ pub fn check_subscription(key: String, server_id: String) -> Result