]> git.proxmox.com Git - proxmox-backup.git/blob - src/tools/runtime.rs
introduce new runtime tokio helpers
[proxmox-backup.git] / src / tools / runtime.rs
1 //! Helpers for quirks of the current tokio runtime.
2
3 use std::cell::RefCell;
4 use std::future::Future;
5
6 use lazy_static::lazy_static;
7 use tokio::runtime::{self, Runtime};
8
9 thread_local! {
10 static HAS_RUNTIME: RefCell<bool> = RefCell::new(false);
11 static IN_TOKIO: RefCell<bool> = RefCell::new(false);
12 }
13
14 fn is_in_tokio() -> bool {
15 IN_TOKIO.with(|v| *v.borrow())
16 }
17
18 fn has_runtime() -> bool {
19 HAS_RUNTIME.with(|v| *v.borrow())
20 }
21
22 struct RuntimeGuard(bool);
23
24 impl RuntimeGuard {
25 fn enter() -> Self {
26 Self(HAS_RUNTIME.with(|v| {
27 let old = *v.borrow();
28 *v.borrow_mut() = true;
29 old
30 }))
31 }
32 }
33
34 impl Drop for RuntimeGuard {
35 fn drop(&mut self) {
36 HAS_RUNTIME.with(|v| {
37 *v.borrow_mut() = self.0;
38 });
39 }
40 }
41
42 lazy_static! {
43 static ref RUNTIME: Runtime = {
44 runtime::Builder::new()
45 .threaded_scheduler()
46 .enable_all()
47 .on_thread_start(|| IN_TOKIO.with(|v| *v.borrow_mut() = true))
48 .build()
49 .expect("failed to spawn tokio runtime")
50 };
51 }
52
53 /// Get or create the current main tokio runtime.
54 ///
55 /// This makes sure that tokio's worker threads are marked for us so that we know whether we
56 /// can/need to use `block_in_place` in our `block_on` helper.
57 pub fn get_runtime() -> &'static Runtime {
58 &RUNTIME
59 }
60
61 /// Associate the current newly spawned thread with the main tokio runtime.
62 pub fn enter_runtime<R>(f: impl FnOnce() -> R) -> R {
63 let _guard = RuntimeGuard::enter();
64 get_runtime().enter(f)
65 }
66
67 /// Block on a synchronous piece of code.
68 pub fn block_in_place<R>(fut: impl FnOnce() -> R) -> R {
69 if is_in_tokio() {
70 // we are in an actual tokio worker thread, block it:
71 tokio::task::block_in_place(fut)
72 } else {
73 // we're not inside a tokio worker, so just run the code:
74 fut()
75 }
76 }
77
78 /// Block on a future in this thread.
79 pub fn block_on<R, F>(fut: F) -> R
80 where
81 R: Send + 'static,
82 F: Future<Output = R> + Send,
83 {
84
85 if is_in_tokio() {
86 // inside a tokio worker we need to tell tokio that we're about to really block:
87 tokio::task::block_in_place(move || futures::executor::block_on(fut))
88 } else if has_runtime() {
89 // we're already associated with a runtime, but we're not a worker-thread, we can just
90 // block this thread directly
91 // This is not strictly necessary, but it's a bit quicker tha the else branch below.
92 futures::executor::block_on(fut)
93 } else {
94 // not a worker thread, not associated with a runtime, make sure we have a runtime (spawn
95 // it on demand if necessary), then enter it:
96 enter_runtime(move || futures::executor::block_on(fut))
97 }
98 }
99
100 /*
101 fn block_on_impl<F>(mut fut: F) -> F::Output
102 where
103 F: Future + Send,
104 F::Output: Send + 'static,
105 {
106 let (tx, rx) = tokio::sync::oneshot::channel();
107 let fut_ptr = &mut fut as *mut F as usize; // hack to not require F to be 'static
108 tokio::spawn(async move {
109 let fut: F = unsafe { std::ptr::read(fut_ptr as *mut F) };
110 tx
111 .send(fut.await)
112 .map_err(drop)
113 .expect("failed to send block_on result to channel")
114 });
115
116 futures::executor::block_on(async move {
117 rx.await.expect("failed to receive block_on result from channel")
118 })
119 std::mem::forget(fut);
120 }
121 */
122
123 /// This used to be our tokio main entry point. Now this just calls out to `block_on` for
124 /// compatibility, which will perform all the necessary tasks on-demand anyway.
125 pub fn main<F>(fut: F) -> F::Output
126 where
127 F: Future + Send,
128 F::Output: Send + 'static,
129 {
130 block_on(fut)
131 }