[workspace]
members = [
"pbs-buildcfg",
+ "pbs-runtime",
]
[lib]
#proxmox-http = { version = "0.2.0", path = "../proxmox/proxmox-http", features = [ "client", "http-helpers", "websocket" ] }
proxmox-openid = "0.6.0"
-pbs-buildcfg = { path = "pbs-buildcfg", version = "0.1" }
+pbs-buildcfg = { path = "pbs-buildcfg" }
+pbs-runtime = { path = "pbs-runtime" }
[features]
default = []
proxmox-restore-daemon
SUBCRATES := \
- pbs-buildcfg
+ pbs-buildcfg \
+ pbs-runtime
ifeq ($(BUILD_MODE), release)
CARGO_BUILD_ARGS += --release
--- /dev/null
+[package]
+name = "pbs-runtime"
+version = "0.1.0"
+authors = ["Proxmox Support Team <support@proxmox.com>"]
+edition = "2018"
+description = "tokio runtime related helpers required for binaries"
+
+[dependencies]
+lazy_static = "1.4"
+pin-utils = "0.1.0"
+tokio = { version = "1.6", features = [ "rt", "rt-multi-thread" ] }
--- /dev/null
+//! Helpers for quirks of the current tokio runtime.
+
+use std::cell::RefCell;
+use std::future::Future;
+use std::sync::{Arc, Weak, Mutex};
+use std::task::{Context, Poll, RawWaker, Waker};
+use std::thread::{self, Thread};
+
+use lazy_static::lazy_static;
+use pin_utils::pin_mut;
+use tokio::runtime::{self, Runtime};
+
+thread_local! {
+ static BLOCKING: RefCell<bool> = RefCell::new(false);
+}
+
+fn is_in_tokio() -> bool {
+ tokio::runtime::Handle::try_current()
+ .is_ok()
+}
+
+fn is_blocking() -> bool {
+ BLOCKING.with(|v| *v.borrow())
+}
+
+struct BlockingGuard(bool);
+
+impl BlockingGuard {
+ fn set() -> Self {
+ Self(BLOCKING.with(|v| {
+ let old = *v.borrow();
+ *v.borrow_mut() = true;
+ old
+ }))
+ }
+}
+
+impl Drop for BlockingGuard {
+ fn drop(&mut self) {
+ BLOCKING.with(|v| {
+ *v.borrow_mut() = self.0;
+ });
+ }
+}
+
+lazy_static! {
+ // avoid openssl bug: https://github.com/openssl/openssl/issues/6214
+ // by dropping the runtime as early as possible
+ static ref RUNTIME: Mutex<Weak<Runtime>> = Mutex::new(Weak::new());
+}
+
+extern {
+ fn OPENSSL_thread_stop();
+}
+
+/// Get or create the current main tokio runtime.
+///
+/// This makes sure that tokio's worker threads are marked for us so that we know whether we
+/// can/need to use `block_in_place` in our `block_on` helper.
+pub fn get_runtime_with_builder<F: Fn() -> runtime::Builder>(get_builder: F) -> Arc<Runtime> {
+
+ let mut guard = RUNTIME.lock().unwrap();
+
+ if let Some(rt) = guard.upgrade() { return rt; }
+
+ let mut builder = get_builder();
+ builder.on_thread_stop(|| {
+ // avoid openssl bug: https://github.com/openssl/openssl/issues/6214
+ // call OPENSSL_thread_stop to avoid race with openssl cleanup handlers
+ unsafe { OPENSSL_thread_stop(); }
+ });
+
+ let runtime = builder.build().expect("failed to spawn tokio runtime");
+ let rt = Arc::new(runtime);
+
+ *guard = Arc::downgrade(&rt);
+
+ rt
+}
+
+/// Get or create the current main tokio runtime.
+///
+/// This calls get_runtime_with_builder() using the tokio default threaded scheduler
+pub fn get_runtime() -> Arc<Runtime> {
+
+ get_runtime_with_builder(|| {
+ let mut builder = runtime::Builder::new_multi_thread();
+ builder.enable_all();
+ builder
+ })
+}
+
+
+/// Block on a synchronous piece of code.
+pub fn block_in_place<R>(fut: impl FnOnce() -> R) -> R {
+ // don't double-exit the context (tokio doesn't like that)
+ // also, if we're not actually in a tokio-worker we must not use block_in_place() either
+ if is_blocking() || !is_in_tokio() {
+ fut()
+ } else {
+ // we are in an actual tokio worker thread, block it:
+ tokio::task::block_in_place(move || {
+ let _guard = BlockingGuard::set();
+ fut()
+ })
+ }
+}
+
+/// Block on a future in this thread.
+pub fn block_on<F: Future>(fut: F) -> F::Output {
+ // don't double-exit the context (tokio doesn't like that)
+ if is_blocking() {
+ block_on_local_future(fut)
+ } else if is_in_tokio() {
+ // inside a tokio worker we need to tell tokio that we're about to really block:
+ tokio::task::block_in_place(move || {
+ let _guard = BlockingGuard::set();
+ block_on_local_future(fut)
+ })
+ } else {
+ // not a worker thread, not associated with a runtime, make sure we have a runtime (spawn
+ // it on demand if necessary), then enter it
+ let _guard = BlockingGuard::set();
+ let _enter_guard = get_runtime().enter();
+ get_runtime().block_on(fut)
+ }
+}
+
+/*
+fn block_on_impl<F>(mut fut: F) -> F::Output
+where
+ F: Future + Send,
+ F::Output: Send + 'static,
+{
+ let (tx, rx) = tokio::sync::oneshot::channel();
+ let fut_ptr = &mut fut as *mut F as usize; // hack to not require F to be 'static
+ tokio::spawn(async move {
+ let fut: F = unsafe { std::ptr::read(fut_ptr as *mut F) };
+ tx
+ .send(fut.await)
+ .map_err(drop)
+ .expect("failed to send block_on result to channel")
+ });
+
+ futures::executor::block_on(async move {
+ rx.await.expect("failed to receive block_on result from channel")
+ })
+ std::mem::forget(fut);
+}
+*/
+
+/// This used to be our tokio main entry point. Now this just calls out to `block_on` for
+/// compatibility, which will perform all the necessary tasks on-demand anyway.
+pub fn main<F: Future>(fut: F) -> F::Output {
+ block_on(fut)
+}
+
+fn block_on_local_future<F: Future>(fut: F) -> F::Output {
+ pin_mut!(fut);
+
+ let waker = Arc::new(thread::current());
+ let waker = thread_waker_clone(Arc::into_raw(waker) as *const ());
+ let waker = unsafe { Waker::from_raw(waker) };
+ let mut context = Context::from_waker(&waker);
+ loop {
+ match fut.as_mut().poll(&mut context) {
+ Poll::Ready(out) => return out,
+ Poll::Pending => thread::park(),
+ }
+ }
+}
+
+const THREAD_WAKER_VTABLE: std::task::RawWakerVTable = std::task::RawWakerVTable::new(
+ thread_waker_clone,
+ thread_waker_wake,
+ thread_waker_wake_by_ref,
+ thread_waker_drop,
+);
+
+fn thread_waker_clone(this: *const ()) -> RawWaker {
+ let this = unsafe { Arc::from_raw(this as *const Thread) };
+ let cloned = Arc::clone(&this);
+ let _ = Arc::into_raw(this);
+
+ RawWaker::new(Arc::into_raw(cloned) as *const (), &THREAD_WAKER_VTABLE)
+}
+
+fn thread_waker_wake(this: *const ()) {
+ let this = unsafe { Arc::from_raw(this as *const Thread) };
+ this.unpark();
+}
+
+fn thread_waker_wake_by_ref(this: *const ()) {
+ let this = unsafe { Arc::from_raw(this as *const Thread) };
+ this.unpark();
+ let _ = Arc::into_raw(this);
+}
+
+fn thread_waker_drop(this: *const ()) {
+ let this = unsafe { Arc::from_raw(this as *const Thread) };
+ drop(this);
+}
};
if benchmark {
env.log("benchmark finished successfully");
- tools::runtime::block_in_place(|| env.remove_backup())?;
+ pbs_runtime::block_in_place(|| env.remove_backup())?;
return Ok(());
}
(Ok(_), Err(err)) => {
env.log(format!("backup ended and finish failed: {}", err));
env.log("removing unfinished backup");
- tools::runtime::block_in_place(|| env.remove_backup())?;
+ pbs_runtime::block_in_place(|| env.remove_backup())?;
Err(err)
},
(Err(err), Err(_)) => {
env.log(format!("backup failed: {}", err));
env.log("removing failed backup");
- tools::runtime::block_in_place(|| env.remove_backup())?;
+ pbs_runtime::block_in_place(|| env.remove_backup())?;
Err(err)
},
}
let (is_duplicate, compressed_size) = match proxmox::try_block! {
let mut chunk = DataBlob::from_raw(raw_data)?;
- tools::runtime::block_in_place(|| {
+ pbs_runtime::block_in_place(|| {
chunk.verify_unencrypted(this.size as usize, &this.digest)?;
// always comput CRC at server side
let changelog_url = &pkg_info[0].change_log_url;
// FIXME: use 'apt-get changelog' for proxmox packages as well, once repo supports it
if changelog_url.starts_with("http://download.proxmox.com/") {
- let changelog = crate::tools::runtime::block_on(client.get_string(changelog_url, None))
+ let changelog = pbs_runtime::block_on(client.get_string(changelog_url, None))
.map_err(|err| format_err!("Error downloading changelog from '{}': {}", changelog_url, err))?;
Ok(json!(changelog))
auth_header.insert("Authorization".to_owned(),
format!("Basic {}", base64::encode(format!("{}:{}", key, id))));
- let changelog = crate::tools::runtime::block_on(client.get_string(changelog_url, Some(&auth_header)))
+ let changelog = pbs_runtime::block_on(client.get_string(changelog_url, Some(&auth_header)))
.map_err(|err| format_err!("Error downloading changelog from '{}': {}", changelog_url, err))?;
Ok(json!(changelog))
env.debug(format!("download chunk {:?}", path));
- let data = tools::runtime::block_in_place(|| std::fs::read(path))
+ let data = pbs_runtime::block_in_place(|| std::fs::read(path))
.map_err(move |err| http_err!(BAD_REQUEST, "reading file {:?} failed: {}", path2, err))?;
let body = Body::from(data);
use crate::backup::catalog::{self, DirEntryAttribute};
use crate::pxar::fuse::{Accessor, FileEntry};
use crate::pxar::Flags;
-use crate::tools::runtime::block_in_place;
+use pbs_runtime::block_in_place;
use crate::tools::ControlFlow;
type CatalogReader = crate::backup::CatalogReader<std::fs::File>;
fn main() {
proxmox_backup::tools::setup_safe_path_env();
- if let Err(err) = proxmox_backup::tools::runtime::main(run()) {
+ if let Err(err) = pbs_runtime::main(run()) {
eprintln!("Error: {}", err);
std::process::exit(-1);
}
let rpcenv = CliEnvironment::new();
run_cli_command(cmd_def, rpcenv, Some(|future| {
- proxmox_backup::tools::runtime::main(future)
+ pbs_runtime::main(future)
}));
}
let mut rpcenv = CliEnvironment::new();
rpcenv.set_auth_id(Some(String::from("root@pam")));
- proxmox_backup::tools::runtime::main(run_async_cli_command(cmd_def, rpcenv));
+ pbs_runtime::main(run_async_cli_command(cmd_def, rpcenv));
}
// shell completion helper
let _ = proxmox::try_block!({
let remote = param.get("remote").ok_or_else(|| format_err!("no remote"))?;
- let data = crate::tools::runtime::block_on(async move {
+ let data = pbs_runtime::block_on(async move {
crate::api2::config::remote::scan_remote_datastores(remote.clone()).await
})?;
bail!("proxy not running as backup user or group (got uid {} gid {})", running_uid, running_gid);
}
- proxmox_backup::tools::runtime::main(run())
+ pbs_runtime::main(run())
}
async fn run() -> Result<(), Error> {
if logrotate.rotate(max_size, None, Some(max_files))? {
println!("rotated access log, telling daemons to re-open log file");
- proxmox_backup::tools::runtime::block_on(command_reopen_logfiles())?;
+ pbs_runtime::block_on(command_reopen_logfiles())?;
worker.log("API access log was rotated".to_string());
} else {
worker.log("API access log was not rotated".to_string());
use proxmox_backup::config::datastore;
- proxmox_backup::tools::runtime::block_in_place(move || {
+ pbs_runtime::block_in_place(move || {
match read_proc_stat() {
Ok(stat) => {
let mut rpcenv = CliEnvironment::new();
rpcenv.set_auth_id(Some(String::from("root@pam")));
- if let Err(err) = proxmox_backup::tools::runtime::main(do_update(&mut rpcenv)) {
+ if let Err(err) = pbs_runtime::main(do_update(&mut rpcenv)) {
eprintln!("error during update: {}", err);
std::process::exit(1);
}
run_cli_command(
cmd_def,
rpcenv,
- Some(|future| proxmox_backup::tools::runtime::main(future)),
+ Some(|future| pbs_runtime::main(future)),
);
}
info!("disk scan complete, starting main runtime...");
- proxmox_backup::tools::runtime::main(run())
+ pbs_runtime::main(run())
}
async fn run() -> Result<(), Error> {
let mut rpcenv = CliEnvironment::new();
rpcenv.set_auth_id(Some(String::from("root@pam")));
- proxmox_backup::tools::runtime::main(run_async_cli_command(cmd_def, rpcenv));
+ pbs_runtime::main(run_async_cli_command(cmd_def, rpcenv));
}
if verbose {
// This will stay in foreground with debug output enabled as None is
// passed for the RawFd.
- return proxmox_backup::tools::runtime::main(mount_do(param, None));
+ return pbs_runtime::main(mount_do(param, None));
}
// Process should be daemonized.
Ok(ForkResult::Child) => {
drop(pr);
nix::unistd::setsid().unwrap();
- proxmox_backup::tools::runtime::main(mount_do(param, Some(pw)))
+ pbs_runtime::main(mount_do(param, Some(pw)))
}
Err(_) => bail!("failed to daemonize process"),
}
}
pub fn complete_backup_group(_arg: &str, param: &HashMap<String, String>) -> Vec<String> {
- proxmox_backup::tools::runtime::main(async { complete_backup_group_do(param).await })
+ pbs_runtime::main(async { complete_backup_group_do(param).await })
}
pub async fn complete_backup_group_do(param: &HashMap<String, String>) -> Vec<String> {
}
pub fn complete_group_or_snapshot(arg: &str, param: &HashMap<String, String>) -> Vec<String> {
- proxmox_backup::tools::runtime::main(async { complete_group_or_snapshot_do(arg, param).await })
+ pbs_runtime::main(async { complete_group_or_snapshot_do(arg, param).await })
}
pub async fn complete_group_or_snapshot_do(arg: &str, param: &HashMap<String, String>) -> Vec<String> {
}
pub fn complete_backup_snapshot(_arg: &str, param: &HashMap<String, String>) -> Vec<String> {
- proxmox_backup::tools::runtime::main(async { complete_backup_snapshot_do(param).await })
+ pbs_runtime::main(async { complete_backup_snapshot_do(param).await })
}
pub async fn complete_backup_snapshot_do(param: &HashMap<String, String>) -> Vec<String> {
}
pub fn complete_server_file_name(_arg: &str, param: &HashMap<String, String>) -> Vec<String> {
- proxmox_backup::tools::runtime::main(async { complete_server_file_name_do(param).await })
+ pbs_runtime::main(async { complete_server_file_name_do(param).await })
}
pub async fn complete_server_file_name_do(param: &HashMap<String, String>) -> Vec<String> {
}
pub fn complete_auth_id(_arg: &str, param: &HashMap<String, String>) -> Vec<String> {
- proxmox_backup::tools::runtime::main(async { complete_auth_id_do(param).await })
+ pbs_runtime::main(async { complete_auth_id_do(param).await })
}
pub async fn complete_auth_id_do(param: &HashMap<String, String>) -> Vec<String> {
let rpcenv = CliEnvironment::new();
run_cli_command(cmd_def, rpcenv, Some(|future| {
- proxmox_backup::tools::runtime::main(future)
+ pbs_runtime::main(future)
}));
}
let verify_and_write_channel = verify_and_write_channel.clone();
Ok::<_, Error>(async move {
- let chunk_exists = crate::tools::runtime::block_in_place(|| {
+ let chunk_exists = pbs_runtime::block_in_place(|| {
target.cond_touch_chunk(&info.digest, false)
})?;
if chunk_exists {
let raw_size = chunk.raw_size() as usize;
// decode, verify and write in a separate threads to maximize throughput
- crate::tools::runtime::block_in_place(|| {
+ pbs_runtime::block_in_place(|| {
verify_and_write_channel.send((chunk, info.digest, info.size()))
})?;
}
}
- match crate::tools::runtime::block_in_place(|| self.rx.as_ref().unwrap().recv()) {
+ match pbs_runtime::block_in_place(|| self.rx.as_ref().unwrap().recv()) {
Ok(data) => Poll::Ready(Some(data)),
Err(_) => {
let error = self.error.lock().unwrap();
use super::BackupReader;
use crate::backup::{AsyncReadChunk, CryptConfig, CryptMode, DataBlob, ReadChunk};
-use crate::tools::runtime::block_on;
+use pbs_runtime::block_on;
/// Read chunks from remote host using ``BackupReader``
#[derive(Clone)]
pub mod lru_cache;
pub mod async_lru_cache;
pub mod nom;
-pub mod runtime;
pub mod serde_filter;
pub mod statistics;
pub mod subscription;
+++ /dev/null
-//! Helpers for quirks of the current tokio runtime.
-
-use std::cell::RefCell;
-use std::future::Future;
-use std::sync::{Arc, Weak, Mutex};
-use std::task::{Context, Poll, RawWaker, Waker};
-use std::thread::{self, Thread};
-
-use lazy_static::lazy_static;
-use pin_utils::pin_mut;
-use tokio::runtime::{self, Runtime};
-
-thread_local! {
- static BLOCKING: RefCell<bool> = RefCell::new(false);
-}
-
-fn is_in_tokio() -> bool {
- tokio::runtime::Handle::try_current()
- .is_ok()
-}
-
-fn is_blocking() -> bool {
- BLOCKING.with(|v| *v.borrow())
-}
-
-struct BlockingGuard(bool);
-
-impl BlockingGuard {
- fn set() -> Self {
- Self(BLOCKING.with(|v| {
- let old = *v.borrow();
- *v.borrow_mut() = true;
- old
- }))
- }
-}
-
-impl Drop for BlockingGuard {
- fn drop(&mut self) {
- BLOCKING.with(|v| {
- *v.borrow_mut() = self.0;
- });
- }
-}
-
-lazy_static! {
- // avoid openssl bug: https://github.com/openssl/openssl/issues/6214
- // by dropping the runtime as early as possible
- static ref RUNTIME: Mutex<Weak<Runtime>> = Mutex::new(Weak::new());
-}
-
-extern {
- fn OPENSSL_thread_stop();
-}
-
-/// Get or create the current main tokio runtime.
-///
-/// This makes sure that tokio's worker threads are marked for us so that we know whether we
-/// can/need to use `block_in_place` in our `block_on` helper.
-pub fn get_runtime_with_builder<F: Fn() -> runtime::Builder>(get_builder: F) -> Arc<Runtime> {
-
- let mut guard = RUNTIME.lock().unwrap();
-
- if let Some(rt) = guard.upgrade() { return rt; }
-
- let mut builder = get_builder();
- builder.on_thread_stop(|| {
- // avoid openssl bug: https://github.com/openssl/openssl/issues/6214
- // call OPENSSL_thread_stop to avoid race with openssl cleanup handlers
- unsafe { OPENSSL_thread_stop(); }
- });
-
- let runtime = builder.build().expect("failed to spawn tokio runtime");
- let rt = Arc::new(runtime);
-
- *guard = Arc::downgrade(&rt);
-
- rt
-}
-
-/// Get or create the current main tokio runtime.
-///
-/// This calls get_runtime_with_builder() using the tokio default threaded scheduler
-pub fn get_runtime() -> Arc<Runtime> {
-
- get_runtime_with_builder(|| {
- let mut builder = runtime::Builder::new_multi_thread();
- builder.enable_all();
- builder
- })
-}
-
-
-/// Block on a synchronous piece of code.
-pub fn block_in_place<R>(fut: impl FnOnce() -> R) -> R {
- // don't double-exit the context (tokio doesn't like that)
- // also, if we're not actually in a tokio-worker we must not use block_in_place() either
- if is_blocking() || !is_in_tokio() {
- fut()
- } else {
- // we are in an actual tokio worker thread, block it:
- tokio::task::block_in_place(move || {
- let _guard = BlockingGuard::set();
- fut()
- })
- }
-}
-
-/// Block on a future in this thread.
-pub fn block_on<F: Future>(fut: F) -> F::Output {
- // don't double-exit the context (tokio doesn't like that)
- if is_blocking() {
- block_on_local_future(fut)
- } else if is_in_tokio() {
- // inside a tokio worker we need to tell tokio that we're about to really block:
- tokio::task::block_in_place(move || {
- let _guard = BlockingGuard::set();
- block_on_local_future(fut)
- })
- } else {
- // not a worker thread, not associated with a runtime, make sure we have a runtime (spawn
- // it on demand if necessary), then enter it
- let _guard = BlockingGuard::set();
- let _enter_guard = get_runtime().enter();
- get_runtime().block_on(fut)
- }
-}
-
-/*
-fn block_on_impl<F>(mut fut: F) -> F::Output
-where
- F: Future + Send,
- F::Output: Send + 'static,
-{
- let (tx, rx) = tokio::sync::oneshot::channel();
- let fut_ptr = &mut fut as *mut F as usize; // hack to not require F to be 'static
- tokio::spawn(async move {
- let fut: F = unsafe { std::ptr::read(fut_ptr as *mut F) };
- tx
- .send(fut.await)
- .map_err(drop)
- .expect("failed to send block_on result to channel")
- });
-
- futures::executor::block_on(async move {
- rx.await.expect("failed to receive block_on result from channel")
- })
- std::mem::forget(fut);
-}
-*/
-
-/// This used to be our tokio main entry point. Now this just calls out to `block_on` for
-/// compatibility, which will perform all the necessary tasks on-demand anyway.
-pub fn main<F: Future>(fut: F) -> F::Output {
- block_on(fut)
-}
-
-fn block_on_local_future<F: Future>(fut: F) -> F::Output {
- pin_mut!(fut);
-
- let waker = Arc::new(thread::current());
- let waker = thread_waker_clone(Arc::into_raw(waker) as *const ());
- let waker = unsafe { Waker::from_raw(waker) };
- let mut context = Context::from_waker(&waker);
- loop {
- match fut.as_mut().poll(&mut context) {
- Poll::Ready(out) => return out,
- Poll::Pending => thread::park(),
- }
- }
-}
-
-const THREAD_WAKER_VTABLE: std::task::RawWakerVTable = std::task::RawWakerVTable::new(
- thread_waker_clone,
- thread_waker_wake,
- thread_waker_wake_by_ref,
- thread_waker_drop,
-);
-
-fn thread_waker_clone(this: *const ()) -> RawWaker {
- let this = unsafe { Arc::from_raw(this as *const Thread) };
- let cloned = Arc::clone(&this);
- let _ = Arc::into_raw(this);
-
- RawWaker::new(Arc::into_raw(cloned) as *const (), &THREAD_WAKER_VTABLE)
-}
-
-fn thread_waker_wake(this: *const ()) {
- let this = unsafe { Arc::from_raw(this as *const Thread) };
- this.unpark();
-}
-
-fn thread_waker_wake_by_ref(this: *const ()) {
- let this = unsafe { Arc::from_raw(this as *const Thread) };
- this.unpark();
- let _ = Arc::into_raw(this);
-}
-
-fn thread_waker_drop(this: *const ()) {
- let this = unsafe { Arc::from_raw(this as *const Thread) };
- drop(this);
-}
let now = proxmox::tools::time::epoch_i64();
- let (response, challenge) = tools::runtime::block_on(register_subscription(&key, &server_id, now))
+ let (response, challenge) = pbs_runtime::block_on(register_subscription(&key, &server_id, now))
.map_err(|err| format_err!("Error checking subscription: {}", err))?;
parse_register_response(&response, key, server_id, now, &challenge)
use futures::ready;
use futures::stream::Stream;
-use crate::tools::runtime::block_in_place;
+use pbs_runtime::block_in_place;
/// Wrapper struct to convert a Reader into a Stream
pub struct WrappedReaderStream<R: Read + Unpin> {
#[test]
fn test_wrapped_stream_reader() -> Result<(), Error> {
- crate::tools::runtime::main(async {
+ pbs_runtime::main(async {
run_wrapped_stream_reader_test().await
})
}