]>
Commit | Line | Data |
---|---|---|
daef93f4 WB |
1 | //! Helpers for quirks of the current tokio runtime. |
2 | ||
d973aa82 | 3 | use std::cell::RefCell; |
daef93f4 WB |
4 | use std::future::Future; |
5 | ||
d973aa82 WB |
6 | use lazy_static::lazy_static; |
7 | use tokio::runtime::{self, Runtime}; | |
8 | ||
9 | thread_local! { | |
10 | static HAS_RUNTIME: RefCell<bool> = RefCell::new(false); | |
11 | static IN_TOKIO: RefCell<bool> = RefCell::new(false); | |
12 | } | |
13 | ||
14 | fn is_in_tokio() -> bool { | |
15 | IN_TOKIO.with(|v| *v.borrow()) | |
16 | } | |
17 | ||
18 | fn has_runtime() -> bool { | |
19 | HAS_RUNTIME.with(|v| *v.borrow()) | |
20 | } | |
21 | ||
22 | struct RuntimeGuard(bool); | |
23 | ||
24 | impl RuntimeGuard { | |
25 | fn enter() -> Self { | |
26 | Self(HAS_RUNTIME.with(|v| { | |
27 | let old = *v.borrow(); | |
28 | *v.borrow_mut() = true; | |
29 | old | |
30 | })) | |
31 | } | |
32 | } | |
33 | ||
34 | impl Drop for RuntimeGuard { | |
35 | fn drop(&mut self) { | |
36 | HAS_RUNTIME.with(|v| { | |
37 | *v.borrow_mut() = self.0; | |
38 | }); | |
39 | } | |
40 | } | |
41 | ||
42 | lazy_static! { | |
43 | static ref RUNTIME: Runtime = { | |
44 | runtime::Builder::new() | |
45 | .threaded_scheduler() | |
46 | .enable_all() | |
47 | .on_thread_start(|| IN_TOKIO.with(|v| *v.borrow_mut() = true)) | |
48 | .build() | |
49 | .expect("failed to spawn tokio runtime") | |
50 | }; | |
51 | } | |
52 | ||
53 | /// Get or create the current main tokio runtime. | |
54 | /// | |
55 | /// This makes sure that tokio's worker threads are marked for us so that we know whether we | |
56 | /// can/need to use `block_in_place` in our `block_on` helper. | |
57 | pub fn get_runtime() -> &'static Runtime { | |
58 | &RUNTIME | |
59 | } | |
60 | ||
61 | /// Associate the current newly spawned thread with the main tokio runtime. | |
62 | pub fn enter_runtime<R>(f: impl FnOnce() -> R) -> R { | |
63 | let _guard = RuntimeGuard::enter(); | |
64 | get_runtime().enter(f) | |
65 | } | |
66 | ||
67 | /// Block on a synchronous piece of code. | |
68 | pub fn block_in_place<R>(fut: impl FnOnce() -> R) -> R { | |
69 | if is_in_tokio() { | |
70 | // we are in an actual tokio worker thread, block it: | |
71 | tokio::task::block_in_place(fut) | |
72 | } else { | |
73 | // we're not inside a tokio worker, so just run the code: | |
74 | fut() | |
75 | } | |
76 | } | |
77 | ||
78 | /// Block on a future in this thread. | |
79 | pub fn block_on<R, F>(fut: F) -> R | |
daef93f4 | 80 | where |
d973aa82 WB |
81 | R: Send + 'static, |
82 | F: Future<Output = R> + Send, | |
daef93f4 | 83 | { |
daef93f4 | 84 | |
d973aa82 WB |
85 | if is_in_tokio() { |
86 | // inside a tokio worker we need to tell tokio that we're about to really block: | |
87 | tokio::task::block_in_place(move || futures::executor::block_on(fut)) | |
88 | } else if has_runtime() { | |
89 | // we're already associated with a runtime, but we're not a worker-thread, we can just | |
90 | // block this thread directly | |
91 | // This is not strictly necessary, but it's a bit quicker tha the else branch below. | |
92 | futures::executor::block_on(fut) | |
93 | } else { | |
94 | // not a worker thread, not associated with a runtime, make sure we have a runtime (spawn | |
95 | // it on demand if necessary), then enter it: | |
96 | enter_runtime(move || futures::executor::block_on(fut)) | |
97 | } | |
98 | } | |
daef93f4 | 99 | |
d973aa82 WB |
100 | /* |
101 | fn block_on_impl<F>(mut fut: F) -> F::Output | |
102 | where | |
103 | F: Future + Send, | |
104 | F::Output: Send + 'static, | |
105 | { | |
106 | let (tx, rx) = tokio::sync::oneshot::channel(); | |
107 | let fut_ptr = &mut fut as *mut F as usize; // hack to not require F to be 'static | |
108 | tokio::spawn(async move { | |
109 | let fut: F = unsafe { std::ptr::read(fut_ptr as *mut F) }; | |
110 | tx | |
111 | .send(fut.await) | |
112 | .map_err(drop) | |
113 | .expect("failed to send block_on result to channel") | |
114 | }); | |
115 | ||
116 | futures::executor::block_on(async move { | |
117 | rx.await.expect("failed to receive block_on result from channel") | |
daef93f4 | 118 | }) |
d973aa82 WB |
119 | std::mem::forget(fut); |
120 | } | |
121 | */ | |
122 | ||
123 | /// This used to be our tokio main entry point. Now this just calls out to `block_on` for | |
124 | /// compatibility, which will perform all the necessary tasks on-demand anyway. | |
125 | pub fn main<F>(fut: F) -> F::Output | |
126 | where | |
127 | F: Future + Send, | |
128 | F::Output: Send + 'static, | |
129 | { | |
130 | block_on(fut) | |
daef93f4 | 131 | } |