]> git.proxmox.com Git - proxmox-backup.git/commitdiff
fix #3359: fix blocking writes in async code during pxar create
authorDominik Csapak <d.csapak@proxmox.com>
Tue, 23 Mar 2021 10:12:20 +0000 (11:12 +0100)
committerDietmar Maurer <dietmar@proxmox.com>
Wed, 24 Mar 2021 08:00:07 +0000 (09:00 +0100)
in commit `asyncify pxar create_archive`, we changed from a
separate thread for creating a pxar to using async code, but the
StdChannelWriter used for both pxar and catalog can block, which
may block the tokio runtime for single (and probably dual) core
environments

this patch adds a wrapper struct for any writer that implements
'std::io::Write' and wraps the write calls with 'block_in_place'
so that if called in a tokio runtime, it knows that this code
potentially blocks

Fixes: 6afb60abf557 ("asyncify pxar create_archive")
Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
src/bin/proxmox-backup-client.rs
src/client/pxar_backup_stream.rs
src/tools.rs
src/tools/tokio_writer_adapter.rs [new file with mode: 0644]

index 5aae08734795aec5b893c78ac3a04da298cd1d1e..45b26c7ad283c2e3209333a7e86b9dbfccf6d2f4 100644 (file)
@@ -32,7 +32,11 @@ use proxmox::{
 };
 use pxar::accessor::{MaybeReady, ReadAt, ReadAtOperation};
 
-use proxmox_backup::tools;
+use proxmox_backup::tools::{
+    self,
+    StdChannelWriter,
+    TokioWriterAdapter,
+};
 use proxmox_backup::api2::types::*;
 use proxmox_backup::api2::version;
 use proxmox_backup::client::*;
@@ -162,7 +166,7 @@ async fn backup_directory<P: AsRef<Path>>(
     dir_path: P,
     archive_name: &str,
     chunk_size: Option<usize>,
-    catalog: Arc<Mutex<CatalogWriter<crate::tools::StdChannelWriter>>>,
+    catalog: Arc<Mutex<CatalogWriter<TokioWriterAdapter<StdChannelWriter>>>>,
     pxar_create_options: proxmox_backup::pxar::PxarCreateOptions,
     upload_options: UploadOptions,
 ) -> Result<BackupStats, Error> {
@@ -460,7 +464,7 @@ async fn start_garbage_collection(param: Value) -> Result<Value, Error> {
 }
 
 struct CatalogUploadResult {
-    catalog_writer: Arc<Mutex<CatalogWriter<crate::tools::StdChannelWriter>>>,
+    catalog_writer: Arc<Mutex<CatalogWriter<TokioWriterAdapter<StdChannelWriter>>>>,
     result: tokio::sync::oneshot::Receiver<Result<BackupStats, Error>>,
 }
 
@@ -473,7 +477,7 @@ fn spawn_catalog_upload(
     let catalog_chunk_size = 512*1024;
     let catalog_chunk_stream = ChunkStream::new(catalog_stream, Some(catalog_chunk_size));
 
-    let catalog_writer = Arc::new(Mutex::new(CatalogWriter::new(crate::tools::StdChannelWriter::new(catalog_tx))?));
+    let catalog_writer = Arc::new(Mutex::new(CatalogWriter::new(TokioWriterAdapter::new(StdChannelWriter::new(catalog_tx)))?));
 
     let (catalog_result_tx, catalog_result_rx) = tokio::sync::oneshot::channel();
 
index b57061a38bbe2444f7391ecf4add6b358b10c2ce..035f735c7ab81bf81316fd60e35075820beb54b8 100644 (file)
@@ -13,6 +13,10 @@ use nix::fcntl::OFlag;
 use nix::sys::stat::Mode;
 
 use crate::backup::CatalogWriter;
+use crate::tools::{
+    StdChannelWriter,
+    TokioWriterAdapter,
+};
 
 /// Stream implementation to encode and upload .pxar archives.
 ///
@@ -45,10 +49,10 @@ impl PxarBackupStream {
         let error = Arc::new(Mutex::new(None));
         let error2 = Arc::clone(&error);
         let handler = async move {
-            let writer = std::io::BufWriter::with_capacity(
+            let writer = TokioWriterAdapter::new(std::io::BufWriter::with_capacity(
                 buffer_size,
-                crate::tools::StdChannelWriter::new(tx),
-            );
+                StdChannelWriter::new(tx),
+            ));
 
             let verbose = options.verbose;
 
index cc782da2ca99e4c20923e0cc54a9c8c900c259cf..7e3bff7baf00a620876c729e448c63bd6fb0fc0c 100644 (file)
@@ -57,6 +57,9 @@ pub use async_channel_writer::AsyncChannelWriter;
 mod std_channel_writer;
 pub use std_channel_writer::StdChannelWriter;
 
+mod tokio_writer_adapter;
+pub use tokio_writer_adapter::TokioWriterAdapter;
+
 mod process_locker;
 pub use process_locker::{ProcessLocker, ProcessLockExclusiveGuard, ProcessLockSharedGuard};
 
diff --git a/src/tools/tokio_writer_adapter.rs b/src/tools/tokio_writer_adapter.rs
new file mode 100644 (file)
index 0000000..7b7f5dc
--- /dev/null
@@ -0,0 +1,26 @@
+use std::io::Write;
+
+use tokio::task::block_in_place;
+
+/// Wrapper around a writer which implements Write
+///
+/// wraps each write with a 'block_in_place' so that
+/// any (blocking) writer can be safely used in async context in a
+/// tokio runtime
+pub struct TokioWriterAdapter<W: Write>(W);
+
+impl<W: Write> TokioWriterAdapter<W> {
+    pub fn new(writer: W) -> Self {
+        Self(writer)
+    }
+}
+
+impl<W: Write> Write for TokioWriterAdapter<W> {
+    fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
+        block_in_place(|| self.0.write(buf))
+    }
+
+    fn flush(&mut self) -> Result<(), std::io::Error> {
+        block_in_place(|| self.0.flush())
+    }
+}