};
use pxar::accessor::{MaybeReady, ReadAt, ReadAtOperation};
-use proxmox_backup::tools;
+use proxmox_backup::tools::{
+ self,
+ StdChannelWriter,
+ TokioWriterAdapter,
+};
use proxmox_backup::api2::types::*;
use proxmox_backup::api2::version;
use proxmox_backup::client::*;
dir_path: P,
archive_name: &str,
chunk_size: Option<usize>,
- catalog: Arc<Mutex<CatalogWriter<crate::tools::StdChannelWriter>>>,
+ catalog: Arc<Mutex<CatalogWriter<TokioWriterAdapter<StdChannelWriter>>>>,
pxar_create_options: proxmox_backup::pxar::PxarCreateOptions,
upload_options: UploadOptions,
) -> Result<BackupStats, Error> {
}
struct CatalogUploadResult {
- catalog_writer: Arc<Mutex<CatalogWriter<crate::tools::StdChannelWriter>>>,
+ catalog_writer: Arc<Mutex<CatalogWriter<TokioWriterAdapter<StdChannelWriter>>>>,
result: tokio::sync::oneshot::Receiver<Result<BackupStats, Error>>,
}
let catalog_chunk_size = 512*1024;
let catalog_chunk_stream = ChunkStream::new(catalog_stream, Some(catalog_chunk_size));
- let catalog_writer = Arc::new(Mutex::new(CatalogWriter::new(crate::tools::StdChannelWriter::new(catalog_tx))?));
+ let catalog_writer = Arc::new(Mutex::new(CatalogWriter::new(TokioWriterAdapter::new(StdChannelWriter::new(catalog_tx)))?));
let (catalog_result_tx, catalog_result_rx) = tokio::sync::oneshot::channel();
use nix::sys::stat::Mode;
use crate::backup::CatalogWriter;
+use crate::tools::{
+ StdChannelWriter,
+ TokioWriterAdapter,
+};
/// Stream implementation to encode and upload .pxar archives.
///
let error = Arc::new(Mutex::new(None));
let error2 = Arc::clone(&error);
let handler = async move {
- let writer = std::io::BufWriter::with_capacity(
+ let writer = TokioWriterAdapter::new(std::io::BufWriter::with_capacity(
buffer_size,
- crate::tools::StdChannelWriter::new(tx),
- );
+ StdChannelWriter::new(tx),
+ ));
let verbose = options.verbose;
mod std_channel_writer;
pub use std_channel_writer::StdChannelWriter;
+mod tokio_writer_adapter;
+pub use tokio_writer_adapter::TokioWriterAdapter;
+
mod process_locker;
pub use process_locker::{ProcessLocker, ProcessLockExclusiveGuard, ProcessLockSharedGuard};
--- /dev/null
+use std::io::Write;
+
+use tokio::task::block_in_place;
+
+/// Wrapper around a writer which implements Write
+///
+/// wraps each write with a 'block_in_place' so that
+/// any (blocking) writer can be safely used in async context in a
+/// tokio runtime
+pub struct TokioWriterAdapter<W: Write>(W);
+
+impl<W: Write> TokioWriterAdapter<W> {
+ pub fn new(writer: W) -> Self {
+ Self(writer)
+ }
+}
+
+impl<W: Write> Write for TokioWriterAdapter<W> {
+ fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
+ block_in_place(|| self.0.write(buf))
+ }
+
+ fn flush(&mut self) -> Result<(), std::io::Error> {
+ block_in_place(|| self.0.flush())
+ }
+}