]> git.proxmox.com Git - proxmox-backup.git/commitdiff
split out pbs-runtime module
authorWolfgang Bumiller <w.bumiller@proxmox.com>
Tue, 6 Jul 2021 10:08:44 +0000 (12:08 +0200)
committerWolfgang Bumiller <w.bumiller@proxmox.com>
Tue, 6 Jul 2021 12:52:25 +0000 (14:52 +0200)
These are mostly tokio specific "hacks" or "workarounds" we
only really need/want in our binaries without pulling it in
via our library crates.

Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
27 files changed:
Cargo.toml
Makefile
pbs-runtime/Cargo.toml [new file with mode: 0644]
pbs-runtime/src/lib.rs [new file with mode: 0644]
src/api2/backup/mod.rs
src/api2/backup/upload_chunk.rs
src/api2/node/apt.rs
src/api2/reader/mod.rs
src/backup/catalog_shell.rs
src/bin/proxmox-backup-api.rs
src/bin/proxmox-backup-client.rs
src/bin/proxmox-backup-manager.rs
src/bin/proxmox-backup-proxy.rs
src/bin/proxmox-daily-update.rs
src/bin/proxmox-file-restore.rs
src/bin/proxmox-restore-daemon.rs
src/bin/proxmox-tape.rs
src/bin/proxmox_backup_client/mount.rs
src/bin/proxmox_client_tools/mod.rs
src/bin/pxar.rs
src/client/pull.rs
src/client/pxar_backup_stream.rs
src/client/remote_chunk_reader.rs
src/tools/mod.rs
src/tools/runtime.rs [deleted file]
src/tools/subscription.rs
src/tools/wrapped_reader_stream.rs

index fa6d501909433b5392146755a6f852a101349207..b9c31550b296827c0f2a418abad5e475fd8bdeba 100644 (file)
@@ -22,6 +22,7 @@ exclude = [ "build", "debian", "tests/catar_data/test_symlink/symlink1"]
 [workspace]
 members = [
     "pbs-buildcfg",
+    "pbs-runtime",
 ]
 
 [lib]
@@ -91,7 +92,8 @@ proxmox-http = { version = "0.2.1", features = [ "client", "http-helpers", "webs
 #proxmox-http = { version = "0.2.0", path = "../proxmox/proxmox-http", features = [ "client", "http-helpers", "websocket" ] }
 proxmox-openid = "0.6.0"
 
-pbs-buildcfg = { path = "pbs-buildcfg", version = "0.1" }
+pbs-buildcfg = { path = "pbs-buildcfg" }
+pbs-runtime = { path = "pbs-runtime" }
 
 [features]
 default = []
index 48296f9398114edc7720fca5e779e5953d941541..f6915c5f3b5663bbb0da533b9040bd04eff52997 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -31,7 +31,8 @@ RESTORE_BIN := \
        proxmox-restore-daemon
 
 SUBCRATES := \
-       pbs-buildcfg
+       pbs-buildcfg \
+       pbs-runtime
 
 ifeq ($(BUILD_MODE), release)
 CARGO_BUILD_ARGS += --release
diff --git a/pbs-runtime/Cargo.toml b/pbs-runtime/Cargo.toml
new file mode 100644 (file)
index 0000000..72c25ac
--- /dev/null
@@ -0,0 +1,11 @@
+[package]
+name = "pbs-runtime"
+version = "0.1.0"
+authors = ["Proxmox Support Team <support@proxmox.com>"]
+edition = "2018"
+description = "tokio runtime related helpers required for binaries"
+
+[dependencies]
+lazy_static = "1.4"
+pin-utils = "0.1.0"
+tokio = { version = "1.6", features = [ "rt", "rt-multi-thread" ] }
diff --git a/pbs-runtime/src/lib.rs b/pbs-runtime/src/lib.rs
new file mode 100644 (file)
index 0000000..477d26d
--- /dev/null
@@ -0,0 +1,202 @@
+//! Helpers for quirks of the current tokio runtime.
+
+use std::cell::RefCell;
+use std::future::Future;
+use std::sync::{Arc, Weak, Mutex};
+use std::task::{Context, Poll, RawWaker, Waker};
+use std::thread::{self, Thread};
+
+use lazy_static::lazy_static;
+use pin_utils::pin_mut;
+use tokio::runtime::{self, Runtime};
+
+thread_local! {
+    static BLOCKING: RefCell<bool> = RefCell::new(false);
+}
+
+fn is_in_tokio() -> bool {
+    tokio::runtime::Handle::try_current()
+        .is_ok()
+}
+
+fn is_blocking() -> bool {
+    BLOCKING.with(|v| *v.borrow())
+}
+
+struct BlockingGuard(bool);
+
+impl BlockingGuard {
+    fn set() -> Self {
+        Self(BLOCKING.with(|v| {
+            let old = *v.borrow();
+            *v.borrow_mut() = true;
+            old
+        }))
+    }
+}
+
+impl Drop for BlockingGuard {
+    fn drop(&mut self) {
+        BLOCKING.with(|v| {
+            *v.borrow_mut() = self.0;
+        });
+    }
+}
+
+lazy_static! {
+    // avoid openssl bug: https://github.com/openssl/openssl/issues/6214
+    // by dropping the runtime as early as possible
+    static ref RUNTIME: Mutex<Weak<Runtime>> = Mutex::new(Weak::new());
+}
+
+extern {
+    fn OPENSSL_thread_stop();
+}
+
+/// Get or create the current main tokio runtime.
+///
+/// This makes sure that tokio's worker threads are marked for us so that we know whether we
+/// can/need to use `block_in_place` in our `block_on` helper.
+pub fn get_runtime_with_builder<F: Fn() -> runtime::Builder>(get_builder: F) -> Arc<Runtime> {
+
+    let mut guard = RUNTIME.lock().unwrap();
+
+    if let Some(rt) = guard.upgrade() { return rt; }
+
+    let mut builder = get_builder();
+    builder.on_thread_stop(|| {
+        // avoid openssl bug: https://github.com/openssl/openssl/issues/6214
+        // call OPENSSL_thread_stop to avoid race with openssl cleanup handlers
+        unsafe { OPENSSL_thread_stop(); }
+    });
+
+    let runtime = builder.build().expect("failed to spawn tokio runtime");
+    let rt = Arc::new(runtime);
+
+    *guard = Arc::downgrade(&rt);
+
+    rt
+}
+
+/// Get or create the current main tokio runtime.
+///
+/// This calls get_runtime_with_builder() using the tokio default threaded scheduler
+pub fn get_runtime() -> Arc<Runtime> {
+
+    get_runtime_with_builder(|| {
+        let mut builder = runtime::Builder::new_multi_thread();
+        builder.enable_all();
+        builder
+    })
+}
+
+
+/// Block on a synchronous piece of code.
+pub fn block_in_place<R>(fut: impl FnOnce() -> R) -> R {
+    // don't double-exit the context (tokio doesn't like that)
+    // also, if we're not actually in a tokio-worker we must not use block_in_place() either
+    if is_blocking() || !is_in_tokio() {
+        fut()
+    } else {
+        // we are in an actual tokio worker thread, block it:
+        tokio::task::block_in_place(move || {
+            let _guard = BlockingGuard::set();
+            fut()
+        })
+    }
+}
+
+/// Block on a future in this thread.
+pub fn block_on<F: Future>(fut: F) -> F::Output {
+    // don't double-exit the context (tokio doesn't like that)
+    if is_blocking() {
+        block_on_local_future(fut)
+    } else if is_in_tokio() {
+        // inside a tokio worker we need to tell tokio that we're about to really block:
+        tokio::task::block_in_place(move || {
+            let _guard = BlockingGuard::set();
+            block_on_local_future(fut)
+        })
+    } else {
+        // not a worker thread, not associated with a runtime, make sure we have a runtime (spawn
+        // it on demand if necessary), then enter it
+        let _guard = BlockingGuard::set();
+        let _enter_guard = get_runtime().enter();
+        get_runtime().block_on(fut)
+    }
+}
+
+/*
+fn block_on_impl<F>(mut fut: F) -> F::Output
+where
+    F: Future + Send,
+    F::Output: Send + 'static,
+{
+    let (tx, rx) = tokio::sync::oneshot::channel();
+    let fut_ptr = &mut fut as *mut F as usize; // hack to not require F to be 'static
+    tokio::spawn(async move {
+        let fut: F = unsafe { std::ptr::read(fut_ptr as *mut F) };
+        tx
+            .send(fut.await)
+            .map_err(drop)
+            .expect("failed to send block_on result to channel")
+    });
+
+    futures::executor::block_on(async move {
+        rx.await.expect("failed to receive block_on result from channel")
+    })
+    std::mem::forget(fut);
+}
+*/
+
+/// This used to be our tokio main entry point. Now this just calls out to `block_on` for
+/// compatibility, which will perform all the necessary tasks on-demand anyway.
+pub fn main<F: Future>(fut: F) -> F::Output {
+    block_on(fut)
+}
+
+fn block_on_local_future<F: Future>(fut: F) -> F::Output {
+    pin_mut!(fut);
+
+    let waker = Arc::new(thread::current());
+    let waker = thread_waker_clone(Arc::into_raw(waker) as *const ());
+    let waker = unsafe { Waker::from_raw(waker) };
+    let mut context = Context::from_waker(&waker);
+    loop {
+        match fut.as_mut().poll(&mut context) {
+            Poll::Ready(out) => return out,
+            Poll::Pending => thread::park(),
+        }
+    }
+}
+
+const THREAD_WAKER_VTABLE: std::task::RawWakerVTable = std::task::RawWakerVTable::new(
+    thread_waker_clone,
+    thread_waker_wake,
+    thread_waker_wake_by_ref,
+    thread_waker_drop,
+);
+
+fn thread_waker_clone(this: *const ()) -> RawWaker {
+    let this = unsafe { Arc::from_raw(this as *const Thread) };
+    let cloned = Arc::clone(&this);
+    let _ = Arc::into_raw(this);
+
+    RawWaker::new(Arc::into_raw(cloned) as *const (), &THREAD_WAKER_VTABLE)
+}
+
+fn thread_waker_wake(this: *const ()) {
+    let this = unsafe { Arc::from_raw(this as *const Thread) };
+    this.unpark();
+}
+
+fn thread_waker_wake_by_ref(this: *const ()) {
+    let this = unsafe { Arc::from_raw(this as *const Thread) };
+    this.unpark();
+    let _ = Arc::into_raw(this);
+}
+
+fn thread_waker_drop(this: *const ()) {
+    let this = unsafe { Arc::from_raw(this as *const Thread) };
+    drop(this);
+}
index 9962cdcb392e3348c629033d1a42c2a83d031741..36fdc1b20c2766b6787b068402f4bf9379def849 100644 (file)
@@ -218,7 +218,7 @@ async move {
             };
             if benchmark {
                 env.log("benchmark finished successfully");
-                tools::runtime::block_in_place(|| env.remove_backup())?;
+                pbs_runtime::block_in_place(|| env.remove_backup())?;
                 return Ok(());
             }
 
@@ -246,13 +246,13 @@ async move {
                 (Ok(_), Err(err)) => {
                     env.log(format!("backup ended and finish failed: {}", err));
                     env.log("removing unfinished backup");
-                    tools::runtime::block_in_place(|| env.remove_backup())?;
+                    pbs_runtime::block_in_place(|| env.remove_backup())?;
                     Err(err)
                 },
                 (Err(err), Err(_)) => {
                     env.log(format!("backup failed: {}", err));
                     env.log("removing failed backup");
-                    tools::runtime::block_in_place(|| env.remove_backup())?;
+                    pbs_runtime::block_in_place(|| env.remove_backup())?;
                     Err(err)
                 },
             }
index cdd38c87e41bfb1eef997ba60de91ff91d4db9ae..1e8f3a797c253cef9365fad7f66c3bc7f62d08d0 100644 (file)
@@ -61,7 +61,7 @@ impl Future for UploadChunk {
                         let (is_duplicate, compressed_size) = match proxmox::try_block! {
                             let mut chunk = DataBlob::from_raw(raw_data)?;
 
-                            tools::runtime::block_in_place(|| {
+                            pbs_runtime::block_in_place(|| {
                                 chunk.verify_unencrypted(this.size as usize, &this.digest)?;
 
                                 // always comput CRC at server side
index 35c9a79ab6e1b68ed69c6970e4862278e60c9e0a..054c8813e7f18b4d34b0762a5ef506df253f57b4 100644 (file)
@@ -236,7 +236,7 @@ fn apt_get_changelog(
     let changelog_url = &pkg_info[0].change_log_url;
     // FIXME: use 'apt-get changelog' for proxmox packages as well, once repo supports it
     if changelog_url.starts_with("http://download.proxmox.com/") {
-        let changelog = crate::tools::runtime::block_on(client.get_string(changelog_url, None))
+        let changelog = pbs_runtime::block_on(client.get_string(changelog_url, None))
             .map_err(|err| format_err!("Error downloading changelog from '{}': {}", changelog_url, err))?;
         Ok(json!(changelog))
 
@@ -260,7 +260,7 @@ fn apt_get_changelog(
         auth_header.insert("Authorization".to_owned(),
             format!("Basic {}", base64::encode(format!("{}:{}", key, id))));
 
-        let changelog = crate::tools::runtime::block_on(client.get_string(changelog_url, Some(&auth_header)))
+        let changelog = pbs_runtime::block_on(client.get_string(changelog_url, Some(&auth_header)))
             .map_err(|err| format_err!("Error downloading changelog from '{}': {}", changelog_url, err))?;
         Ok(json!(changelog))
 
index 648fe73dbbbfa3de9d1a69a2bc592cd8e40dad27..d0081fad9325f8a9e48ac1e9eea0ed105b685061 100644 (file)
@@ -322,7 +322,7 @@ fn download_chunk(
 
         env.debug(format!("download chunk {:?}", path));
 
-        let data = tools::runtime::block_in_place(|| std::fs::read(path))
+        let data = pbs_runtime::block_in_place(|| std::fs::read(path))
             .map_err(move |err| http_err!(BAD_REQUEST, "reading file {:?} failed: {}", path2, err))?;
 
         let body = Body::from(data);
index 11866c3067b85a6ef7f8340e7268259a9567f401..b186ac6c507e1e9a076a656592a83db3127c417b 100644 (file)
@@ -21,7 +21,7 @@ use pxar::{EntryKind, Metadata};
 use crate::backup::catalog::{self, DirEntryAttribute};
 use crate::pxar::fuse::{Accessor, FileEntry};
 use crate::pxar::Flags;
-use crate::tools::runtime::block_in_place;
+use pbs_runtime::block_in_place;
 use crate::tools::ControlFlow;
 
 type CatalogReader = crate::backup::CatalogReader<std::fs::File>;
index 625b7232df3d991e8f3ec71fcdab0e12b487e034..25ed030ac428ec0fb570b26cd3b0e71b645b087a 100644 (file)
@@ -18,7 +18,7 @@ use proxmox_backup::config;
 fn main() {
     proxmox_backup::tools::setup_safe_path_env();
 
-    if let Err(err) = proxmox_backup::tools::runtime::main(run()) {
+    if let Err(err) = pbs_runtime::main(run()) {
         eprintln!("Error: {}", err);
         std::process::exit(-1);
     }
index 578b2680460fc342d7f52991cb3ab68ee59ef601..bd52abbcbf5a4782843e2012c419549ff390e0f8 100644 (file)
@@ -1494,6 +1494,6 @@ fn main() {
 
     let rpcenv = CliEnvironment::new();
     run_cli_command(cmd_def, rpcenv, Some(|future| {
-        proxmox_backup::tools::runtime::main(future)
+        pbs_runtime::main(future)
     }));
 }
index 60b4c6c658a937553359d6e2ce6b56586ee00a80..a7e7eecf8c9dd42c63909a7e4a370799f10091db 100644 (file)
@@ -397,7 +397,7 @@ fn main() {
     let mut rpcenv = CliEnvironment::new();
     rpcenv.set_auth_id(Some(String::from("root@pam")));
 
-   proxmox_backup::tools::runtime::main(run_async_cli_command(cmd_def, rpcenv));
+   pbs_runtime::main(run_async_cli_command(cmd_def, rpcenv));
 }
 
 // shell completion helper
@@ -408,7 +408,7 @@ pub fn complete_remote_datastore_name(_arg: &str, param: &HashMap<String, String
     let _ = proxmox::try_block!({
         let remote = param.get("remote").ok_or_else(|| format_err!("no remote"))?;
 
-        let data = crate::tools::runtime::block_on(async move {
+        let data = pbs_runtime::block_on(async move {
             crate::api2::config::remote::scan_remote_datastores(remote.clone()).await
         })?;
 
index 728c0da55110ae5d5fe524d1d4b43d4d122d528a..d7c675bb475e928578d44efe2820d5556036b6a7 100644 (file)
@@ -65,7 +65,7 @@ fn main() -> Result<(), Error> {
         bail!("proxy not running as backup user or group (got uid {} gid {})", running_uid, running_gid);
     }
 
-    proxmox_backup::tools::runtime::main(run())
+    pbs_runtime::main(run())
 }
 
 async fn run() -> Result<(), Error> {
@@ -700,7 +700,7 @@ async fn schedule_task_log_rotate() {
 
                 if logrotate.rotate(max_size, None, Some(max_files))? {
                     println!("rotated access log, telling daemons to re-open log file");
-                    proxmox_backup::tools::runtime::block_on(command_reopen_logfiles())?;
+                    pbs_runtime::block_on(command_reopen_logfiles())?;
                     worker.log("API access log was rotated".to_string());
                 } else {
                     worker.log("API access log was not rotated".to_string());
@@ -787,7 +787,7 @@ async fn generate_host_stats(save: bool) {
     use proxmox_backup::config::datastore;
 
 
-    proxmox_backup::tools::runtime::block_in_place(move || {
+    pbs_runtime::block_in_place(move || {
 
         match read_proc_stat() {
             Ok(stat) => {
index be3bfe44a5db9c62034ccc76313c83eec7140643..b962b67f515d0cc9c64934aaa311a3adc9086879 100644 (file)
@@ -91,7 +91,7 @@ fn main() {
     let mut rpcenv = CliEnvironment::new();
     rpcenv.set_auth_id(Some(String::from("root@pam")));
 
-    if let Err(err) = proxmox_backup::tools::runtime::main(do_update(&mut rpcenv)) {
+    if let Err(err) = pbs_runtime::main(do_update(&mut rpcenv)) {
         eprintln!("error during update: {}", err);
         std::process::exit(1);
     }
index 5766b2798253d904df32ca9a06603aa7f1b63065..17033cf565e1d8f5fab913cea064e48c583c7a72 100644 (file)
@@ -473,6 +473,6 @@ fn main() {
     run_cli_command(
         cmd_def,
         rpcenv,
-        Some(|future| proxmox_backup::tools::runtime::main(future)),
+        Some(|future| pbs_runtime::main(future)),
     );
 }
index 420c668f94ae96dc3f481baa7751d88c0379ccde..3e59ccfad18dab134a79713ac2d8c8ee1ddb789d 100644 (file)
@@ -62,7 +62,7 @@ fn main() -> Result<(), Error> {
 
     info!("disk scan complete, starting main runtime...");
 
-    proxmox_backup::tools::runtime::main(run())
+    pbs_runtime::main(run())
 }
 
 async fn run() -> Result<(), Error> {
index 46bd4ecc7e857a7cc0aca83d8bb87e8102dc700a..c095fe436def48f275e3f6986d8ff9c660786d25 100644 (file)
@@ -1101,5 +1101,5 @@ fn main() {
     let mut rpcenv = CliEnvironment::new();
     rpcenv.set_auth_id(Some(String::from("root@pam")));
 
-    proxmox_backup::tools::runtime::main(run_async_cli_command(cmd_def, rpcenv));
+    pbs_runtime::main(run_async_cli_command(cmd_def, rpcenv));
 }
index d0f04f89718222080917bcfbd144152fea3c7a3c..21f78c32474f459183cabcaf9f8c287678adb540 100644 (file)
@@ -139,7 +139,7 @@ fn mount(
     if verbose {
         // This will stay in foreground with debug output enabled as None is
         // passed for the RawFd.
-        return proxmox_backup::tools::runtime::main(mount_do(param, None));
+        return pbs_runtime::main(mount_do(param, None));
     }
 
     // Process should be daemonized.
@@ -155,7 +155,7 @@ fn mount(
         Ok(ForkResult::Child) => {
             drop(pr);
             nix::unistd::setsid().unwrap();
-            proxmox_backup::tools::runtime::main(mount_do(param, Some(pw)))
+            pbs_runtime::main(mount_do(param, Some(pw)))
         }
         Err(_) => bail!("failed to daemonize process"),
     }
index 24d9fa6305f262b58124a972f7ea5556bf18a38d..00e5155af8103ce28434052168170a9bf974363f 100644 (file)
@@ -107,7 +107,7 @@ pub async fn try_get(repo: &BackupRepository, url: &str) -> Value {
 }
 
 pub fn complete_backup_group(_arg: &str, param: &HashMap<String, String>) -> Vec<String> {
-    proxmox_backup::tools::runtime::main(async { complete_backup_group_do(param).await })
+    pbs_runtime::main(async { complete_backup_group_do(param).await })
 }
 
 pub async fn complete_backup_group_do(param: &HashMap<String, String>) -> Vec<String> {
@@ -137,7 +137,7 @@ pub async fn complete_backup_group_do(param: &HashMap<String, String>) -> Vec<St
 }
 
 pub fn complete_group_or_snapshot(arg: &str, param: &HashMap<String, String>) -> Vec<String> {
-    proxmox_backup::tools::runtime::main(async { complete_group_or_snapshot_do(arg, param).await })
+    pbs_runtime::main(async { complete_group_or_snapshot_do(arg, param).await })
 }
 
 pub async fn complete_group_or_snapshot_do(arg: &str, param: &HashMap<String, String>) -> Vec<String> {
@@ -156,7 +156,7 @@ pub async fn complete_group_or_snapshot_do(arg: &str, param: &HashMap<String, St
 }
 
 pub fn complete_backup_snapshot(_arg: &str, param: &HashMap<String, String>) -> Vec<String> {
-    proxmox_backup::tools::runtime::main(async { complete_backup_snapshot_do(param).await })
+    pbs_runtime::main(async { complete_backup_snapshot_do(param).await })
 }
 
 pub async fn complete_backup_snapshot_do(param: &HashMap<String, String>) -> Vec<String> {
@@ -188,7 +188,7 @@ pub async fn complete_backup_snapshot_do(param: &HashMap<String, String>) -> Vec
 }
 
 pub fn complete_server_file_name(_arg: &str, param: &HashMap<String, String>) -> Vec<String> {
-    proxmox_backup::tools::runtime::main(async { complete_server_file_name_do(param).await })
+    pbs_runtime::main(async { complete_server_file_name_do(param).await })
 }
 
 pub async fn complete_server_file_name_do(param: &HashMap<String, String>) -> Vec<String> {
@@ -279,7 +279,7 @@ pub fn complete_chunk_size(_arg: &str, _param: &HashMap<String, String>) -> Vec<
 }
 
 pub fn complete_auth_id(_arg: &str, param: &HashMap<String, String>) -> Vec<String> {
-    proxmox_backup::tools::runtime::main(async { complete_auth_id_do(param).await })
+    pbs_runtime::main(async { complete_auth_id_do(param).await })
 }
 
 pub async fn complete_auth_id_do(param: &HashMap<String, String>) -> Vec<String> {
index 0bd17ee1790326ef5d5a3b0ee9a63bb308bc524c..34cae30cf56cd54d652435287043f32fb9094f9d 100644 (file)
@@ -491,6 +491,6 @@ fn main() {
 
     let rpcenv = CliEnvironment::new();
     run_cli_command(cmd_def, rpcenv, Some(|future| {
-        proxmox_backup::tools::runtime::main(future)
+        pbs_runtime::main(future)
     }));
 }
index 1ee0e0d1e0b5c533e59fb77426454573b4dffc8e..19f91961a728bcdfcfc714a0a5f9d2b0324803c6 100644 (file)
@@ -73,7 +73,7 @@ async fn pull_index_chunks<I: IndexFile>(
             let verify_and_write_channel = verify_and_write_channel.clone();
 
             Ok::<_, Error>(async move {
-                let chunk_exists = crate::tools::runtime::block_in_place(|| {
+                let chunk_exists = pbs_runtime::block_in_place(|| {
                     target.cond_touch_chunk(&info.digest, false)
                 })?;
                 if chunk_exists {
@@ -85,7 +85,7 @@ async fn pull_index_chunks<I: IndexFile>(
                 let raw_size = chunk.raw_size() as usize;
 
                 // decode, verify and write in a separate threads to maximize throughput
-                crate::tools::runtime::block_in_place(|| {
+                pbs_runtime::block_in_place(|| {
                     verify_and_write_channel.send((chunk, info.digest, info.size()))
                 })?;
 
index 035f735c7ab81bf81316fd60e35075820beb54b8..1d3fc228293163df1dd74a7455d2e73d05544cc8 100644 (file)
@@ -113,7 +113,7 @@ impl Stream for PxarBackupStream {
             }
         }
 
-        match crate::tools::runtime::block_in_place(|| self.rx.as_ref().unwrap().recv()) {
+        match pbs_runtime::block_in_place(|| self.rx.as_ref().unwrap().recv()) {
             Ok(data) => Poll::Ready(Some(data)),
             Err(_) => {
                 let error = self.error.lock().unwrap();
index 06f693a2b64b369455130a058254c1285e341365..62e6feaaee6888c03acef57b2b8a38117fa8eafa 100644 (file)
@@ -7,7 +7,7 @@ use anyhow::{bail, Error};
 
 use super::BackupReader;
 use crate::backup::{AsyncReadChunk, CryptConfig, CryptMode, DataBlob, ReadChunk};
-use crate::tools::runtime::block_on;
+use pbs_runtime::block_on;
 
 /// Read chunks from remote host using ``BackupReader``
 #[derive(Clone)]
index 716b959350fc0aad063c83f1cb490887590227ec..23943990e175529a6acedeb07ee54f10614f2fe7 100644 (file)
@@ -47,7 +47,6 @@ pub mod loopdev;
 pub mod lru_cache;
 pub mod async_lru_cache;
 pub mod nom;
-pub mod runtime;
 pub mod serde_filter;
 pub mod statistics;
 pub mod subscription;
diff --git a/src/tools/runtime.rs b/src/tools/runtime.rs
deleted file mode 100644 (file)
index 477d26d..0000000
+++ /dev/null
@@ -1,202 +0,0 @@
-//! Helpers for quirks of the current tokio runtime.
-
-use std::cell::RefCell;
-use std::future::Future;
-use std::sync::{Arc, Weak, Mutex};
-use std::task::{Context, Poll, RawWaker, Waker};
-use std::thread::{self, Thread};
-
-use lazy_static::lazy_static;
-use pin_utils::pin_mut;
-use tokio::runtime::{self, Runtime};
-
-thread_local! {
-    static BLOCKING: RefCell<bool> = RefCell::new(false);
-}
-
-fn is_in_tokio() -> bool {
-    tokio::runtime::Handle::try_current()
-        .is_ok()
-}
-
-fn is_blocking() -> bool {
-    BLOCKING.with(|v| *v.borrow())
-}
-
-struct BlockingGuard(bool);
-
-impl BlockingGuard {
-    fn set() -> Self {
-        Self(BLOCKING.with(|v| {
-            let old = *v.borrow();
-            *v.borrow_mut() = true;
-            old
-        }))
-    }
-}
-
-impl Drop for BlockingGuard {
-    fn drop(&mut self) {
-        BLOCKING.with(|v| {
-            *v.borrow_mut() = self.0;
-        });
-    }
-}
-
-lazy_static! {
-    // avoid openssl bug: https://github.com/openssl/openssl/issues/6214
-    // by dropping the runtime as early as possible
-    static ref RUNTIME: Mutex<Weak<Runtime>> = Mutex::new(Weak::new());
-}
-
-extern {
-    fn OPENSSL_thread_stop();
-}
-
-/// Get or create the current main tokio runtime.
-///
-/// This makes sure that tokio's worker threads are marked for us so that we know whether we
-/// can/need to use `block_in_place` in our `block_on` helper.
-pub fn get_runtime_with_builder<F: Fn() -> runtime::Builder>(get_builder: F) -> Arc<Runtime> {
-
-    let mut guard = RUNTIME.lock().unwrap();
-
-    if let Some(rt) = guard.upgrade() { return rt; }
-
-    let mut builder = get_builder();
-    builder.on_thread_stop(|| {
-        // avoid openssl bug: https://github.com/openssl/openssl/issues/6214
-        // call OPENSSL_thread_stop to avoid race with openssl cleanup handlers
-        unsafe { OPENSSL_thread_stop(); }
-    });
-
-    let runtime = builder.build().expect("failed to spawn tokio runtime");
-    let rt = Arc::new(runtime);
-
-    *guard = Arc::downgrade(&rt);
-
-    rt
-}
-
-/// Get or create the current main tokio runtime.
-///
-/// This calls get_runtime_with_builder() using the tokio default threaded scheduler
-pub fn get_runtime() -> Arc<Runtime> {
-
-    get_runtime_with_builder(|| {
-        let mut builder = runtime::Builder::new_multi_thread();
-        builder.enable_all();
-        builder
-    })
-}
-
-
-/// Block on a synchronous piece of code.
-pub fn block_in_place<R>(fut: impl FnOnce() -> R) -> R {
-    // don't double-exit the context (tokio doesn't like that)
-    // also, if we're not actually in a tokio-worker we must not use block_in_place() either
-    if is_blocking() || !is_in_tokio() {
-        fut()
-    } else {
-        // we are in an actual tokio worker thread, block it:
-        tokio::task::block_in_place(move || {
-            let _guard = BlockingGuard::set();
-            fut()
-        })
-    }
-}
-
-/// Block on a future in this thread.
-pub fn block_on<F: Future>(fut: F) -> F::Output {
-    // don't double-exit the context (tokio doesn't like that)
-    if is_blocking() {
-        block_on_local_future(fut)
-    } else if is_in_tokio() {
-        // inside a tokio worker we need to tell tokio that we're about to really block:
-        tokio::task::block_in_place(move || {
-            let _guard = BlockingGuard::set();
-            block_on_local_future(fut)
-        })
-    } else {
-        // not a worker thread, not associated with a runtime, make sure we have a runtime (spawn
-        // it on demand if necessary), then enter it
-        let _guard = BlockingGuard::set();
-        let _enter_guard = get_runtime().enter();
-        get_runtime().block_on(fut)
-    }
-}
-
-/*
-fn block_on_impl<F>(mut fut: F) -> F::Output
-where
-    F: Future + Send,
-    F::Output: Send + 'static,
-{
-    let (tx, rx) = tokio::sync::oneshot::channel();
-    let fut_ptr = &mut fut as *mut F as usize; // hack to not require F to be 'static
-    tokio::spawn(async move {
-        let fut: F = unsafe { std::ptr::read(fut_ptr as *mut F) };
-        tx
-            .send(fut.await)
-            .map_err(drop)
-            .expect("failed to send block_on result to channel")
-    });
-
-    futures::executor::block_on(async move {
-        rx.await.expect("failed to receive block_on result from channel")
-    })
-    std::mem::forget(fut);
-}
-*/
-
-/// This used to be our tokio main entry point. Now this just calls out to `block_on` for
-/// compatibility, which will perform all the necessary tasks on-demand anyway.
-pub fn main<F: Future>(fut: F) -> F::Output {
-    block_on(fut)
-}
-
-fn block_on_local_future<F: Future>(fut: F) -> F::Output {
-    pin_mut!(fut);
-
-    let waker = Arc::new(thread::current());
-    let waker = thread_waker_clone(Arc::into_raw(waker) as *const ());
-    let waker = unsafe { Waker::from_raw(waker) };
-    let mut context = Context::from_waker(&waker);
-    loop {
-        match fut.as_mut().poll(&mut context) {
-            Poll::Ready(out) => return out,
-            Poll::Pending => thread::park(),
-        }
-    }
-}
-
-const THREAD_WAKER_VTABLE: std::task::RawWakerVTable = std::task::RawWakerVTable::new(
-    thread_waker_clone,
-    thread_waker_wake,
-    thread_waker_wake_by_ref,
-    thread_waker_drop,
-);
-
-fn thread_waker_clone(this: *const ()) -> RawWaker {
-    let this = unsafe { Arc::from_raw(this as *const Thread) };
-    let cloned = Arc::clone(&this);
-    let _ = Arc::into_raw(this);
-
-    RawWaker::new(Arc::into_raw(cloned) as *const (), &THREAD_WAKER_VTABLE)
-}
-
-fn thread_waker_wake(this: *const ()) {
-    let this = unsafe { Arc::from_raw(this as *const Thread) };
-    this.unpark();
-}
-
-fn thread_waker_wake_by_ref(this: *const ()) {
-    let this = unsafe { Arc::from_raw(this as *const Thread) };
-    this.unpark();
-    let _ = Arc::into_raw(this);
-}
-
-fn thread_waker_drop(this: *const ()) {
-    let this = unsafe { Arc::from_raw(this as *const Thread) };
-    drop(this);
-}
index 08917284da09df60f2a1d36435f220c8b7cf8115..425468f1088e7b9d327149b7b9f8d64365bb6340 100644 (file)
@@ -228,7 +228,7 @@ pub fn check_subscription(key: String, server_id: String) -> Result<Subscription
 
     let now = proxmox::tools::time::epoch_i64();
 
-    let (response, challenge) = tools::runtime::block_on(register_subscription(&key, &server_id, now))
+    let (response, challenge) = pbs_runtime::block_on(register_subscription(&key, &server_id, now))
         .map_err(|err| format_err!("Error checking subscription: {}", err))?;
 
     parse_register_response(&response, key, server_id, now, &challenge)
index 4b01b072cfe8cd8a87f7ad29ae391e75d51a8b8c..6217545f2fc49f6e2949951e0430ca799221ca74 100644 (file)
@@ -7,7 +7,7 @@ use tokio::io::{AsyncRead, ReadBuf};
 use futures::ready;
 use futures::stream::Stream;
 
-use crate::tools::runtime::block_in_place;
+use pbs_runtime::block_in_place;
 
 /// Wrapper struct to convert a Reader into a Stream
 pub struct WrappedReaderStream<R: Read + Unpin> {
@@ -108,7 +108,7 @@ mod test {
 
     #[test]
     fn test_wrapped_stream_reader() -> Result<(), Error> {
-        crate::tools::runtime::main(async {
+        pbs_runtime::main(async {
             run_wrapped_stream_reader_test().await
         })
     }