]> git.proxmox.com Git - proxmox-backup.git/blame - src/tools/runtime.rs
introduce new runtime tokio helpers
[proxmox-backup.git] / src / tools / runtime.rs
CommitLineData
daef93f4
WB
1//! Helpers for quirks of the current tokio runtime.
2
d973aa82 3use std::cell::RefCell;
daef93f4
WB
4use std::future::Future;
5
d973aa82
WB
6use lazy_static::lazy_static;
7use tokio::runtime::{self, Runtime};
8
9thread_local! {
10 static HAS_RUNTIME: RefCell<bool> = RefCell::new(false);
11 static IN_TOKIO: RefCell<bool> = RefCell::new(false);
12}
13
14fn is_in_tokio() -> bool {
15 IN_TOKIO.with(|v| *v.borrow())
16}
17
18fn has_runtime() -> bool {
19 HAS_RUNTIME.with(|v| *v.borrow())
20}
21
22struct RuntimeGuard(bool);
23
24impl RuntimeGuard {
25 fn enter() -> Self {
26 Self(HAS_RUNTIME.with(|v| {
27 let old = *v.borrow();
28 *v.borrow_mut() = true;
29 old
30 }))
31 }
32}
33
34impl Drop for RuntimeGuard {
35 fn drop(&mut self) {
36 HAS_RUNTIME.with(|v| {
37 *v.borrow_mut() = self.0;
38 });
39 }
40}
41
42lazy_static! {
43 static ref RUNTIME: Runtime = {
44 runtime::Builder::new()
45 .threaded_scheduler()
46 .enable_all()
47 .on_thread_start(|| IN_TOKIO.with(|v| *v.borrow_mut() = true))
48 .build()
49 .expect("failed to spawn tokio runtime")
50 };
51}
52
53/// Get or create the current main tokio runtime.
54///
55/// This makes sure that tokio's worker threads are marked for us so that we know whether we
56/// can/need to use `block_in_place` in our `block_on` helper.
57pub fn get_runtime() -> &'static Runtime {
58 &RUNTIME
59}
60
61/// Associate the current newly spawned thread with the main tokio runtime.
62pub fn enter_runtime<R>(f: impl FnOnce() -> R) -> R {
63 let _guard = RuntimeGuard::enter();
64 get_runtime().enter(f)
65}
66
67/// Block on a synchronous piece of code.
68pub fn block_in_place<R>(fut: impl FnOnce() -> R) -> R {
69 if is_in_tokio() {
70 // we are in an actual tokio worker thread, block it:
71 tokio::task::block_in_place(fut)
72 } else {
73 // we're not inside a tokio worker, so just run the code:
74 fut()
75 }
76}
77
78/// Block on a future in this thread.
79pub fn block_on<R, F>(fut: F) -> R
daef93f4 80where
d973aa82
WB
81 R: Send + 'static,
82 F: Future<Output = R> + Send,
daef93f4 83{
daef93f4 84
d973aa82
WB
85 if is_in_tokio() {
86 // inside a tokio worker we need to tell tokio that we're about to really block:
87 tokio::task::block_in_place(move || futures::executor::block_on(fut))
88 } else if has_runtime() {
89 // we're already associated with a runtime, but we're not a worker-thread, we can just
90 // block this thread directly
91 // This is not strictly necessary, but it's a bit quicker tha the else branch below.
92 futures::executor::block_on(fut)
93 } else {
94 // not a worker thread, not associated with a runtime, make sure we have a runtime (spawn
95 // it on demand if necessary), then enter it:
96 enter_runtime(move || futures::executor::block_on(fut))
97 }
98}
daef93f4 99
d973aa82
WB
100/*
101fn block_on_impl<F>(mut fut: F) -> F::Output
102where
103 F: Future + Send,
104 F::Output: Send + 'static,
105{
106 let (tx, rx) = tokio::sync::oneshot::channel();
107 let fut_ptr = &mut fut as *mut F as usize; // hack to not require F to be 'static
108 tokio::spawn(async move {
109 let fut: F = unsafe { std::ptr::read(fut_ptr as *mut F) };
110 tx
111 .send(fut.await)
112 .map_err(drop)
113 .expect("failed to send block_on result to channel")
114 });
115
116 futures::executor::block_on(async move {
117 rx.await.expect("failed to receive block_on result from channel")
daef93f4 118 })
d973aa82
WB
119 std::mem::forget(fut);
120}
121*/
122
123/// This used to be our tokio main entry point. Now this just calls out to `block_on` for
124/// compatibility, which will perform all the necessary tasks on-demand anyway.
125pub fn main<F>(fut: F) -> F::Output
126where
127 F: Future + Send,
128 F::Output: Send + 'static,
129{
130 block_on(fut)
daef93f4 131}