1 //! Helpers for quirks of the current tokio runtime.
3 use std
::cell
::RefCell
;
4 use std
::future
::Future
;
5 use std
::sync
::{Arc, Weak, Mutex}
;
6 use std
::task
::{Context, Poll, RawWaker, Waker}
;
7 use std
::thread
::{self, Thread}
;
9 use lazy_static
::lazy_static
;
10 use pin_utils
::pin_mut
;
11 use tokio
::runtime
::{self, Runtime}
;
14 static BLOCKING
: RefCell
<bool
> = RefCell
::new(false);
17 fn is_in_tokio() -> bool
{
18 tokio
::runtime
::Handle
::try_current()
22 fn is_blocking() -> bool
{
23 BLOCKING
.with(|v
| *v
.borrow())
26 struct BlockingGuard(bool
);
30 Self(BLOCKING
.with(|v
| {
31 let old
= *v
.borrow();
32 *v
.borrow_mut() = true;
38 impl Drop
for BlockingGuard
{
41 *v
.borrow_mut() = self.0;
47 // avoid openssl bug: https://github.com/openssl/openssl/issues/6214
48 // by dropping the runtime as early as possible
49 static ref RUNTIME
: Mutex
<Weak
<Runtime
>> = Mutex
::new(Weak
::new());
53 fn OPENSSL_thread_stop();
56 /// Get or create the current main tokio runtime.
58 /// This makes sure that tokio's worker threads are marked for us so that we know whether we
59 /// can/need to use `block_in_place` in our `block_on` helper.
60 pub fn get_runtime_with_builder
<F
: Fn() -> runtime
::Builder
>(get_builder
: F
) -> Arc
<Runtime
> {
62 let mut guard
= RUNTIME
.lock().unwrap();
64 if let Some(rt
) = guard
.upgrade() { return rt; }
66 let mut builder
= get_builder();
67 builder
.on_thread_stop(|| {
68 // avoid openssl bug: https://github.com/openssl/openssl/issues/6214
69 // call OPENSSL_thread_stop to avoid race with openssl cleanup handlers
70 unsafe { OPENSSL_thread_stop(); }
73 let runtime
= builder
.build().expect("failed to spawn tokio runtime");
74 let rt
= Arc
::new(runtime
);
76 *guard
= Arc
::downgrade(&rt
);
81 /// Get or create the current main tokio runtime.
83 /// This calls get_runtime_with_builder() using the tokio default threaded scheduler
84 pub fn get_runtime() -> Arc
<Runtime
> {
86 get_runtime_with_builder(|| {
87 let mut builder
= runtime
::Builder
::new_multi_thread();
94 /// Block on a synchronous piece of code.
95 pub fn block_in_place
<R
>(fut
: impl FnOnce() -> R
) -> R
{
96 // don't double-exit the context (tokio doesn't like that)
97 // also, if we're not actually in a tokio-worker we must not use block_in_place() either
98 if is_blocking() || !is_in_tokio() {
101 // we are in an actual tokio worker thread, block it:
102 tokio
::task
::block_in_place(move || {
103 let _guard
= BlockingGuard
::set();
109 /// Block on a future in this thread.
110 pub fn block_on
<F
: Future
>(fut
: F
) -> F
::Output
{
111 // don't double-exit the context (tokio doesn't like that)
113 block_on_local_future(fut
)
114 } else if is_in_tokio() {
115 // inside a tokio worker we need to tell tokio that we're about to really block:
116 tokio
::task
::block_in_place(move || {
117 let _guard
= BlockingGuard
::set();
118 block_on_local_future(fut
)
121 // not a worker thread, not associated with a runtime, make sure we have a runtime (spawn
122 // it on demand if necessary), then enter it
123 let _guard
= BlockingGuard
::set();
124 let _enter_guard
= get_runtime().enter();
125 get_runtime().block_on(fut
)
130 fn block_on_impl<F>(mut fut: F) -> F::Output
133 F::Output: Send + 'static,
135 let (tx, rx) = tokio::sync::oneshot::channel();
136 let fut_ptr = &mut fut as *mut F as usize; // hack to not require F to be 'static
137 tokio::spawn(async move {
138 let fut: F = unsafe { std::ptr::read(fut_ptr as *mut F) };
142 .expect("failed to send block_on result to channel")
145 futures::executor::block_on(async move {
146 rx.await.expect("failed to receive block_on result from channel")
148 std::mem::forget(fut);
152 /// This used to be our tokio main entry point. Now this just calls out to `block_on` for
153 /// compatibility, which will perform all the necessary tasks on-demand anyway.
154 pub fn main
<F
: Future
>(fut
: F
) -> F
::Output
{
158 fn block_on_local_future
<F
: Future
>(fut
: F
) -> F
::Output
{
161 let waker
= Arc
::new(thread
::current());
162 let waker
= thread_waker_clone(Arc
::into_raw(waker
) as *const ());
163 let waker
= unsafe { Waker::from_raw(waker) }
;
164 let mut context
= Context
::from_waker(&waker
);
166 match fut
.as_mut().poll(&mut context
) {
167 Poll
::Ready(out
) => return out
,
168 Poll
::Pending
=> thread
::park(),
173 const THREAD_WAKER_VTABLE
: std
::task
::RawWakerVTable
= std
::task
::RawWakerVTable
::new(
176 thread_waker_wake_by_ref
,
180 fn thread_waker_clone(this
: *const ()) -> RawWaker
{
181 let this
= unsafe { Arc::from_raw(this as *const Thread) }
;
182 let cloned
= Arc
::clone(&this
);
183 let _
= Arc
::into_raw(this
);
185 RawWaker
::new(Arc
::into_raw(cloned
) as *const (), &THREAD_WAKER_VTABLE
)
188 fn thread_waker_wake(this
: *const ()) {
189 let this
= unsafe { Arc::from_raw(this as *const Thread) }
;
193 fn thread_waker_wake_by_ref(this
: *const ()) {
194 let this
= unsafe { Arc::from_raw(this as *const Thread) }
;
196 let _
= Arc
::into_raw(this
);
199 fn thread_waker_drop(this
: *const ()) {
200 let this
= unsafe { Arc::from_raw(this as *const Thread) }
;