]> git.proxmox.com Git - proxmox.git/commitdiff
proxmox-async: start new crate
authorDietmar Maurer <dietmar@proxmox.com>
Fri, 19 Nov 2021 15:03:09 +0000 (16:03 +0100)
committerDietmar Maurer <dietmar@proxmox.com>
Fri, 19 Nov 2021 15:42:11 +0000 (16:42 +0100)
Signed-off-by: Dietmar Maurer <dietmar@proxmox.com>
Cargo.toml
Makefile
proxmox-async/Cargo.toml [new file with mode: 0644]
proxmox-async/debian/changelog [new file with mode: 0644]
proxmox-async/debian/debcargo.toml [new file with mode: 0644]
proxmox-async/src/blocking.rs [new file with mode: 0644]
proxmox-async/src/lib.rs [new file with mode: 0644]
proxmox-async/src/runtime.rs [new file with mode: 0644]

index 94e2838fa8bd76fb7f9043413d1c6a06beb653c5..a2f94ab6cc9890297a109ac31d559b43f276ad88 100644 (file)
@@ -2,6 +2,7 @@
 members = [
     "proxmox",
     "proxmox-api-macro",
+    "proxmox-async",
     "proxmox-borrow",
     "proxmox-http",
     "proxmox-io",
index 83b736e52debaf1b74d67935fff8e8b6d6c4405a..bdf901c0add2447cf463f4aceda9ffc5a3379f6b 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -3,6 +3,7 @@
 CRATES = \
         proxmox \
         proxmox-api-macro \
+        proxmox-async \
         proxmox-borrow \
         proxmox-http \
         proxmox-io \
diff --git a/proxmox-async/Cargo.toml b/proxmox-async/Cargo.toml
new file mode 100644 (file)
index 0000000..1c9a974
--- /dev/null
@@ -0,0 +1,16 @@
+[package]
+name = "proxmox-async"
+version = "0.1.0"
+authors = ["Proxmox Support Team <support@proxmox.com>"]
+edition = "2018"
+license = "AGPL-3"
+description = "Proxmox async/tokio helpers"
+
+exclude = [ "debian" ]
+
+[dependencies]
+anyhow = "1.0"
+futures = "0.3"
+lazy_static = "1.4"
+pin-utils = "0.1.0"
+tokio = { version = "1.0", features = ["rt", "rt-multi-thread"] }
diff --git a/proxmox-async/debian/changelog b/proxmox-async/debian/changelog
new file mode 100644 (file)
index 0000000..6e3fb01
--- /dev/null
@@ -0,0 +1,10 @@
+rust-proxmox-async (0.1.0) stable; urgency=medium
+  * imported pbs-tools/src/blocking.rs
+
+  * imported pbs-runtime/src/lib.rs to runtime.rs
+  
+  * initial release
+
+ -- root <root@elsa>  Fri, 19 Nov 2021 15:43:44 +0100
+
diff --git a/proxmox-async/debian/debcargo.toml b/proxmox-async/debian/debcargo.toml
new file mode 100644 (file)
index 0000000..1e7ee9f
--- /dev/null
@@ -0,0 +1,10 @@
+overlay = "."
+crate_src_path = ".."
+maintainer = "Proxmox Support Team <support@proxmox.com>"
+
+[source]
+vcs_git = "git://git.proxmox.com/git/proxmox.git"
+vcs_browser = "https://git.proxmox.com/?p=proxmox.git"
+
+[packages.lib]
+depends = [ "uuid-dev" ]
diff --git a/proxmox-async/src/blocking.rs b/proxmox-async/src/blocking.rs
new file mode 100644 (file)
index 0000000..36d3f1e
--- /dev/null
@@ -0,0 +1,99 @@
+//! Async wrappers for blocking I/O (adding `block_in_place` around channels/readers)
+
+use std::io::{self, Read};
+use std::pin::Pin;
+use std::task::{Context, Poll};
+use std::sync::mpsc::Receiver;
+
+use futures::stream::Stream;
+
+use crate::runtime::block_in_place;
+
+/// Wrapper struct to convert a Reader into a Stream
+pub struct WrappedReaderStream<R: Read + Unpin> {
+    reader: R,
+    buffer: Vec<u8>,
+}
+
+impl <R: Read + Unpin> WrappedReaderStream<R> {
+
+    pub fn new(reader: R) -> Self {
+        let mut buffer = Vec::with_capacity(64*1024);
+        unsafe { buffer.set_len(buffer.capacity()); }
+        Self { reader, buffer }
+    }
+}
+
+impl<R: Read + Unpin> Stream for WrappedReaderStream<R> {
+    type Item = Result<Vec<u8>, io::Error>;
+
+    fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Item>> {
+        let this = self.get_mut();
+        match block_in_place(|| this.reader.read(&mut this.buffer)) {
+            Ok(n) => {
+                if n == 0 {
+                    // EOF
+                    Poll::Ready(None)
+                } else {
+                    Poll::Ready(Some(Ok(this.buffer[..n].to_vec())))
+                }
+            }
+            Err(err) => Poll::Ready(Some(Err(err))),
+        }
+    }
+}
+
+/// Wrapper struct to convert a channel Receiver into a Stream
+pub struct StdChannelStream<T>(pub Receiver<T>);
+
+impl<T> Stream for StdChannelStream<T> {
+    type Item = T;
+
+    fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Item>> {
+        match block_in_place(|| self.0.recv()) {
+            Ok(data) => Poll::Ready(Some(data)),
+            Err(_) => Poll::Ready(None),// channel closed
+        }
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use std::io;
+
+    use anyhow::Error;
+    use futures::stream::TryStreamExt;
+
+    #[test]
+    fn test_wrapped_stream_reader() -> Result<(), Error> {
+        crate::runtime::main(async {
+            run_wrapped_stream_reader_test().await
+        })
+    }
+
+    struct DummyReader(usize);
+
+    impl io::Read for DummyReader {
+        fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+            self.0 += 1;
+
+            if self.0 >= 10 {
+                return Ok(0);
+            }
+
+            unsafe {
+                std::ptr::write_bytes(buf.as_mut_ptr(), 0, buf.len());
+            }
+
+            Ok(buf.len())
+        }
+    }
+
+    async fn run_wrapped_stream_reader_test() -> Result<(), Error> {
+        let mut reader = super::WrappedReaderStream::new(DummyReader(0));
+        while let Some(_data) = reader.try_next().await? {
+            // just waiting
+        }
+        Ok(())
+    }
+}
diff --git a/proxmox-async/src/lib.rs b/proxmox-async/src/lib.rs
new file mode 100644 (file)
index 0000000..340711c
--- /dev/null
@@ -0,0 +1,2 @@
+pub mod blocking;
+pub mod runtime;
diff --git a/proxmox-async/src/runtime.rs b/proxmox-async/src/runtime.rs
new file mode 100644 (file)
index 0000000..baa7ded
--- /dev/null
@@ -0,0 +1,203 @@
+//! Helpers for quirks of the current tokio runtime.
+
+use std::cell::RefCell;
+use std::future::Future;
+use std::sync::{Arc, Weak, Mutex};
+use std::task::{Context, Poll, RawWaker, Waker};
+use std::thread::{self, Thread};
+
+use lazy_static::lazy_static;
+use pin_utils::pin_mut;
+use tokio::runtime::{self, Runtime};
+
+thread_local! {
+    static BLOCKING: RefCell<bool> = RefCell::new(false);
+}
+
+fn is_in_tokio() -> bool {
+    tokio::runtime::Handle::try_current()
+        .is_ok()
+}
+
+fn is_blocking() -> bool {
+    BLOCKING.with(|v| *v.borrow())
+}
+
+struct BlockingGuard(bool);
+
+impl BlockingGuard {
+    fn set() -> Self {
+        Self(BLOCKING.with(|v| {
+            let old = *v.borrow();
+            *v.borrow_mut() = true;
+            old
+        }))
+    }
+}
+
+impl Drop for BlockingGuard {
+    fn drop(&mut self) {
+        BLOCKING.with(|v| {
+            *v.borrow_mut() = self.0;
+        });
+    }
+}
+
+lazy_static! {
+    // avoid openssl bug: https://github.com/openssl/openssl/issues/6214
+    // by dropping the runtime as early as possible
+    static ref RUNTIME: Mutex<Weak<Runtime>> = Mutex::new(Weak::new());
+}
+
+#[link(name = "crypto")]
+extern "C" {
+    fn OPENSSL_thread_stop();
+}
+
+/// Get or create the current main tokio runtime.
+///
+/// This makes sure that tokio's worker threads are marked for us so that we know whether we
+/// can/need to use `block_in_place` in our `block_on` helper.
+pub fn get_runtime_with_builder<F: Fn() -> runtime::Builder>(get_builder: F) -> Arc<Runtime> {
+
+    let mut guard = RUNTIME.lock().unwrap();
+
+    if let Some(rt) = guard.upgrade() { return rt; }
+
+    let mut builder = get_builder();
+    builder.on_thread_stop(|| {
+        // avoid openssl bug: https://github.com/openssl/openssl/issues/6214
+        // call OPENSSL_thread_stop to avoid race with openssl cleanup handlers
+        unsafe { OPENSSL_thread_stop(); }
+    });
+
+    let runtime = builder.build().expect("failed to spawn tokio runtime");
+    let rt = Arc::new(runtime);
+
+    *guard = Arc::downgrade(&rt);
+
+    rt
+}
+
+/// Get or create the current main tokio runtime.
+///
+/// This calls get_runtime_with_builder() using the tokio default threaded scheduler
+pub fn get_runtime() -> Arc<Runtime> {
+
+    get_runtime_with_builder(|| {
+        let mut builder = runtime::Builder::new_multi_thread();
+        builder.enable_all();
+        builder
+    })
+}
+
+
+/// Block on a synchronous piece of code.
+pub fn block_in_place<R>(fut: impl FnOnce() -> R) -> R {
+    // don't double-exit the context (tokio doesn't like that)
+    // also, if we're not actually in a tokio-worker we must not use block_in_place() either
+    if is_blocking() || !is_in_tokio() {
+        fut()
+    } else {
+        // we are in an actual tokio worker thread, block it:
+        tokio::task::block_in_place(move || {
+            let _guard = BlockingGuard::set();
+            fut()
+        })
+    }
+}
+
+/// Block on a future in this thread.
+pub fn block_on<F: Future>(fut: F) -> F::Output {
+    // don't double-exit the context (tokio doesn't like that)
+    if is_blocking() {
+        block_on_local_future(fut)
+    } else if is_in_tokio() {
+        // inside a tokio worker we need to tell tokio that we're about to really block:
+        tokio::task::block_in_place(move || {
+            let _guard = BlockingGuard::set();
+            block_on_local_future(fut)
+        })
+    } else {
+        // not a worker thread, not associated with a runtime, make sure we have a runtime (spawn
+        // it on demand if necessary), then enter it
+        let _guard = BlockingGuard::set();
+        let _enter_guard = get_runtime().enter();
+        get_runtime().block_on(fut)
+    }
+}
+
+/*
+fn block_on_impl<F>(mut fut: F) -> F::Output
+where
+    F: Future + Send,
+    F::Output: Send + 'static,
+{
+    let (tx, rx) = tokio::sync::oneshot::channel();
+    let fut_ptr = &mut fut as *mut F as usize; // hack to not require F to be 'static
+    tokio::spawn(async move {
+        let fut: F = unsafe { std::ptr::read(fut_ptr as *mut F) };
+        tx
+            .send(fut.await)
+            .map_err(drop)
+            .expect("failed to send block_on result to channel")
+    });
+
+    futures::executor::block_on(async move {
+        rx.await.expect("failed to receive block_on result from channel")
+    })
+    std::mem::forget(fut);
+}
+*/
+
+/// This used to be our tokio main entry point. Now this just calls out to `block_on` for
+/// compatibility, which will perform all the necessary tasks on-demand anyway.
+pub fn main<F: Future>(fut: F) -> F::Output {
+    block_on(fut)
+}
+
+fn block_on_local_future<F: Future>(fut: F) -> F::Output {
+    pin_mut!(fut);
+
+    let waker = Arc::new(thread::current());
+    let waker = thread_waker_clone(Arc::into_raw(waker) as *const ());
+    let waker = unsafe { Waker::from_raw(waker) };
+    let mut context = Context::from_waker(&waker);
+    loop {
+        match fut.as_mut().poll(&mut context) {
+            Poll::Ready(out) => return out,
+            Poll::Pending => thread::park(),
+        }
+    }
+}
+
+const THREAD_WAKER_VTABLE: std::task::RawWakerVTable = std::task::RawWakerVTable::new(
+    thread_waker_clone,
+    thread_waker_wake,
+    thread_waker_wake_by_ref,
+    thread_waker_drop,
+);
+
+fn thread_waker_clone(this: *const ()) -> RawWaker {
+    let this = unsafe { Arc::from_raw(this as *const Thread) };
+    let cloned = Arc::clone(&this);
+    let _ = Arc::into_raw(this);
+
+    RawWaker::new(Arc::into_raw(cloned) as *const (), &THREAD_WAKER_VTABLE)
+}
+
+fn thread_waker_wake(this: *const ()) {
+    let this = unsafe { Arc::from_raw(this as *const Thread) };
+    this.unpark();
+}
+
+fn thread_waker_wake_by_ref(this: *const ()) {
+    let this = unsafe { Arc::from_raw(this as *const Thread) };
+    this.unpark();
+    let _ = Arc::into_raw(this);
+}
+
+fn thread_waker_drop(this: *const ()) {
+    let this = unsafe { Arc::from_raw(this as *const Thread) };
+    drop(this);
+}