]> git.proxmox.com Git - proxmox-backup.git/commitdiff
introduce new runtime tokio helpers
authorWolfgang Bumiller <w.bumiller@proxmox.com>
Mon, 20 Jan 2020 11:52:22 +0000 (12:52 +0100)
committerWolfgang Bumiller <w.bumiller@proxmox.com>
Mon, 20 Jan 2020 12:12:40 +0000 (13:12 +0100)
Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
13 files changed:
src/backup/catalog.rs
src/bin/download-speed.rs
src/bin/h2client.rs
src/bin/h2s-client.rs
src/bin/h2s-server.rs
src/bin/h2server.rs
src/bin/proxmox-backup-api.rs
src/bin/proxmox-backup-proxy.rs
src/bin/test_chunk_speed2.rs
src/bin/upload-speed.rs
src/client/remote_chunk_reader.rs
src/tools/runtime.rs
src/tools/wrapped_reader_stream.rs

index 65605e6e536766e630ad5e29c3a0fde82ff92777..9cbf275bc8e119c5d2df12279a9372767339f994 100644 (file)
@@ -8,10 +8,12 @@ use std::convert::TryFrom;
 use chrono::offset::{TimeZone, Local};
 
 use proxmox::tools::io::ReadExt;
+use proxmox::sys::error::io_err_other;
 
 use crate::pxar::catalog::BackupCatalogWriter;
 use crate::pxar::{MatchPattern, MatchPatternSlice, MatchType};
 use crate::backup::file_formats::PROXMOX_CATALOG_FILE_MAGIC_1_0;
+use crate::tools::runtime::block_on;
 
 #[repr(u8)]
 #[derive(Copy,Clone,PartialEq)]
@@ -384,12 +386,12 @@ impl SenderWriter {
 
 impl Write for SenderWriter {
     fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
-        tokio::task::block_in_place(|| {
-            futures::executor::block_on(async move {
-                self.0.send(Ok(buf.to_vec())).await
-                    .map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err.to_string()))?;
-                Ok(buf.len())
-            })
+        block_on(async move {
+            self.0
+                .send(Ok(buf.to_vec()))
+                .await
+                .map_err(io_err_other)
+                .and(Ok(buf.len()))
         })
     }
 
index 3e4010a28f92f046d79dd3f800408d9345544620..c4b36217b60e6574be4b1627cf5e65cb365e9120 100644 (file)
@@ -57,7 +57,7 @@ async fn run() -> Result<(), Error> {
 
 #[tokio::main]
 async fn main() {
-    if let Err(err) = run().await {
+    if let Err(err) = proxmox_backup::tools::runtime::main(run()) {
         eprintln!("ERROR: {}", err);
     }
     println!("DONE");
index 542ecff0b96b318e8e649b0b3359ce09a87d41dc..b82ec55d3f4f62e15c7fd958e4fb79b44eb42bc3 100644 (file)
@@ -69,8 +69,11 @@ fn send_request(
         })
 }
 
-#[tokio::main]
-async fn main() -> Result<(), Error> {
+fn main() -> Result<(), Error> {
+    proxmox_backup::tools::runtime::main(run())
+}
+
+async fn run() -> Result<(), Error> {
 
     let start = std::time::SystemTime::now();
 
index df9a95b80380f23bac8e89774755f7d8f8913738..6c848cd2171b8d95694d3021ef6947dabb90f05a 100644 (file)
@@ -67,8 +67,11 @@ fn send_request(
         })
 }
 
-#[tokio::main]
-async fn main() -> Result<(), Error> {
+fn main() -> Result<(), Error> {
+    proxmox_backup::tools::runtime::main(run())
+}
+
+async fn run() -> Result<(), Error> {
     let start = std::time::SystemTime::now();
 
     let conn =
index 39483af24446dac1478217817110d15af9ef026b..5218ccccecee5130392c5f205b5f51364a0a71ee 100644 (file)
@@ -10,8 +10,11 @@ use proxmox_backup::configdir;
 
 // Simple H2 server to test H2 speed with h2s-client.rs
 
-#[tokio::main]
-async fn main() -> Result<(), Error> {
+fn main() -> Result<(), Error> {
+    proxmox_backup::tools::runtime::main(run())
+}
+
+async fn run() -> Result<(), Error> {
     let key_path = configdir!("/proxy.key");
     let cert_path = configdir!("/proxy.pem");
 
index 3d602134cd945784a95b8cbc5bae644003e49b11..5e31288d44d0c839db5f687a79c73a00bdd91124 100644 (file)
@@ -8,8 +8,11 @@ use tokio::io::{AsyncRead, AsyncWrite};
 
 use proxmox_backup::client::pipe_to_stream::PipeToSendStream;
 
-#[tokio::main]
-async fn main() -> Result<(), Error> {
+fn main() -> Result<(), Error> {
+    proxmox_backup::tools::runtime::main(run())
+}
+
+async fn run() -> Result<(), Error> {
     let mut listener = TcpListener::bind(std::net::SocketAddr::from(([127,0,0,1], 8008))).await?;
 
     println!("listening on {:?}", listener.local_addr());
index baee4a41a10a52ed224a85fb955ce57f318a00f0..a1ac399ab160c0cd682dd14cf0f6f01adeae6a26 100644 (file)
@@ -13,9 +13,8 @@ use proxmox_backup::auth_helpers::*;
 use proxmox_backup::config;
 use proxmox_backup::buildcfg;
 
-#[tokio::main]
-async fn main() {
-    if let Err(err) = run().await {
+fn main() {
+    if let Err(err) = proxmox_backup::tools::runtime::main(run()) {
         eprintln!("Error: {}", err);
         std::process::exit(-1);
     }
index ccf7c88a3d01bc1b2ac2a1e0c06550ffd5d1862f..fdd54cadff2274c2d1e2731ac7622a41ac47c2b5 100644 (file)
@@ -15,9 +15,8 @@ use proxmox_backup::tools::daemon;
 use proxmox_backup::server::{ApiConfig, rest::*};
 use proxmox_backup::auth_helpers::*;
 
-#[tokio::main]
-async fn main() {
-    if let Err(err) = run().await {
+fn main() {
+    if let Err(err) = proxmox_backup::tools::runtime::main(run()) {
         eprintln!("Error: {}", err);
         std::process::exit(-1);
     }
index 27f7a1570621200087678356c9c501a8dd98ab2f..7d061a5dce141697f98f666eb34142d5606b6d56 100644 (file)
@@ -12,9 +12,8 @@ use proxmox_backup::backup::*;
 //
 // Note: I can currently get about 830MB/s
 
-#[tokio::main]
-async fn main() {
-    if let Err(err) = run().await {
+fn main() {
+    if let Err(err) = proxmox_backup::tools::runtime::main(run()) {
         panic!("ERROR: {}", err);
     }
 }
index 2b9a50b47cddffc7081e6173585c274030895660..8fcf4f4db4def4a9f71ba46517ee7d703acdc783 100644 (file)
@@ -21,9 +21,8 @@ async fn upload_speed() -> Result<usize, Error> {
     Ok(res)
 }
 
-#[tokio::main]
-async fn main()  {
-    match upload_speed().await {
+fn main()  {
+    match proxmox_backup::tools::runtime::main(upload_speed()) {
         Ok(mbs) => {
             println!("average upload speed: {} MB/s", mbs);
         }
index aeb821832047a382b898462e8f78b943e3b43b76..d997d3504ba5b67784369c8995df15633b6c584d 100644 (file)
@@ -5,6 +5,7 @@ use failure::*;
 
 use super::BackupReader;
 use crate::backup::{ReadChunk, DataBlob, CryptConfig};
+use crate::tools::runtime::block_on;
 
 /// Read chunks from remote host using ``BackupReader``
 pub struct RemoteChunkReader {
@@ -35,7 +36,14 @@ impl ReadChunk for RemoteChunkReader {
 
         let mut chunk_data = Vec::with_capacity(4*1024*1024);
 
-        tokio::task::block_in_place(|| futures::executor::block_on(self.client.download_chunk(&digest, &mut chunk_data)))?;
+        //tokio::task::block_in_place(|| futures::executor::block_on(self.client.download_chunk(&digest, &mut chunk_data)))?;
+        block_on(async {
+            // download_chunk returns the writer back to us, but we need to return a 'static value
+            self.client
+                .download_chunk(&digest, &mut chunk_data)
+                .await
+                .map(drop)
+        })?;
 
         let chunk = DataBlob::from_raw(chunk_data)?;
         chunk.verify_crc()?;
index f5e2ca92c5b9492f642118118935f58d4bfc5dd6..92155f05f7ce9d473bf611db89fce96c6a59bbd5 100644 (file)
 //! Helpers for quirks of the current tokio runtime.
 
+use std::cell::RefCell;
 use std::future::Future;
 
-pub fn main<F, T>(fut: F) -> T
+use lazy_static::lazy_static;
+use tokio::runtime::{self, Runtime};
+
+thread_local! {
+    static HAS_RUNTIME: RefCell<bool> = RefCell::new(false);
+    static IN_TOKIO: RefCell<bool> = RefCell::new(false);
+}
+
+fn is_in_tokio() -> bool {
+    IN_TOKIO.with(|v| *v.borrow())
+}
+
+fn has_runtime() -> bool {
+    HAS_RUNTIME.with(|v| *v.borrow())
+}
+
+struct RuntimeGuard(bool);
+
+impl RuntimeGuard {
+    fn enter() -> Self {
+        Self(HAS_RUNTIME.with(|v| {
+            let old = *v.borrow();
+            *v.borrow_mut() = true;
+            old
+        }))
+    }
+}
+
+impl Drop for RuntimeGuard {
+    fn drop(&mut self) {
+        HAS_RUNTIME.with(|v| {
+            *v.borrow_mut() = self.0;
+        });
+    }
+}
+
+lazy_static! {
+    static ref RUNTIME: Runtime = {
+        runtime::Builder::new()
+            .threaded_scheduler()
+            .enable_all()
+            .on_thread_start(|| IN_TOKIO.with(|v| *v.borrow_mut() = true))
+            .build()
+            .expect("failed to spawn tokio runtime")
+    };
+}
+
+/// Get or create the current main tokio runtime.
+///
+/// This makes sure that tokio's worker threads are marked for us so that we know whether we
+/// can/need to use `block_in_place` in our `block_on` helper.
+pub fn get_runtime() -> &'static Runtime {
+    &RUNTIME
+}
+
+/// Associate the current newly spawned thread with the main tokio runtime.
+pub fn enter_runtime<R>(f: impl FnOnce() -> R) -> R {
+    let _guard = RuntimeGuard::enter();
+    get_runtime().enter(f)
+}
+
+/// Block on a synchronous piece of code.
+pub fn block_in_place<R>(fut: impl FnOnce() -> R) -> R {
+    if is_in_tokio() {
+        // we are in an actual tokio worker thread, block it:
+        tokio::task::block_in_place(fut)
+    } else {
+        // we're not inside a tokio worker, so just run the code:
+        fut()
+    }
+}
+
+/// Block on a future in this thread.
+pub fn block_on<R, F>(fut: F) -> R
 where
-    F: Future<Output = T> + Send + 'static,
-    T: std::fmt::Debug + Send + 'static,
+    R: Send + 'static,
+    F: Future<Output = R> + Send,
 {
-    let mut rt = tokio::runtime::Runtime::new().unwrap();
-    rt.block_on(async {
-        let (tx, rx) = tokio::sync::oneshot::channel();
 
-        tokio::spawn(async move {
-            tx.send(fut.await).unwrap()
-        });
+    if is_in_tokio() {
+        // inside a tokio worker we need to tell tokio that we're about to really block:
+        tokio::task::block_in_place(move || futures::executor::block_on(fut))
+    } else if has_runtime() {
+        // we're already associated with a runtime, but we're not a worker-thread, we can just
+        // block this thread directly
+        // This is not strictly necessary, but it's a bit quicker tha the else branch below.
+        futures::executor::block_on(fut)
+    } else {
+        // not a worker thread, not associated with a runtime, make sure we have a runtime (spawn
+        // it on demand if necessary), then enter it:
+        enter_runtime(move || futures::executor::block_on(fut))
+    }
+}
 
-        rx.await.unwrap()
+/*
+fn block_on_impl<F>(mut fut: F) -> F::Output
+where
+    F: Future + Send,
+    F::Output: Send + 'static,
+{
+    let (tx, rx) = tokio::sync::oneshot::channel();
+    let fut_ptr = &mut fut as *mut F as usize; // hack to not require F to be 'static
+    tokio::spawn(async move {
+        let fut: F = unsafe { std::ptr::read(fut_ptr as *mut F) };
+        tx
+            .send(fut.await)
+            .map_err(drop)
+            .expect("failed to send block_on result to channel")
+    });
+
+    futures::executor::block_on(async move {
+        rx.await.expect("failed to receive block_on result from channel")
     })
+    std::mem::forget(fut);
+}
+*/
+
+/// This used to be our tokio main entry point. Now this just calls out to `block_on` for
+/// compatibility, which will perform all the necessary tasks on-demand anyway.
+pub fn main<F>(fut: F) -> F::Output
+where
+    F: Future + Send,
+    F::Output: Send + 'static,
+{
+    block_on(fut)
 }
index 422b17cdfe9dddb4de41c346c448ee7ec8eff994..927132dcf255feb1c0a2e695a5cbe90db3961eb0 100644 (file)
@@ -2,9 +2,10 @@ use std::io::{self, Read};
 use std::pin::Pin;
 use std::task::{Context, Poll};
 
-use tokio::task::block_in_place;
 use futures::stream::Stream;
 
+use crate::tools::runtime::block_in_place;
+
 pub struct WrappedReaderStream<R: Read + Unpin> {
     reader: R,
     buffer: Vec<u8>,