]> git.proxmox.com Git - pxar.git/blobdiff - src/encoder/mod.rs
don't hold temp buffer mutex across await point
[pxar.git] / src / encoder / mod.rs
index 516b14c7b1a09cfa3dbae17c9773a4e5b3213b0e..d14b2d04765bce11378af2de2007205675f64635 100644 (file)
@@ -2,6 +2,8 @@
 //!
 //! This is the implementation used by both the synchronous and async pxar wrappers.
 
+#![deny(missing_docs)]
+
 use std::io;
 use std::mem::{forget, size_of, size_of_val, take};
 use std::os::unix::ffi::OsStrExt;
@@ -29,6 +31,7 @@ pub use sync::Encoder;
 pub struct LinkOffset(u64);
 
 impl LinkOffset {
+    /// Get the raw byte offset of this link.
     #[inline]
     pub fn raw(self) -> u64 {
         self.0
@@ -41,12 +44,23 @@ impl LinkOffset {
 /// synchronous wrapper and for both `tokio` and `future` `AsyncWrite` types in the asynchronous
 /// wrapper.
 pub trait SeqWrite {
+    /// Attempt to perform a sequential write to the file. On success, the number of written bytes
+    /// is returned as `Poll::Ready(Ok(bytes))`.
+    ///
+    /// If writing is not yet possible, `Poll::Pending` is returned and the current task will be
+    /// notified via the `cx.waker()` when writing becomes possible.
     fn poll_seq_write(
         self: Pin<&mut Self>,
         cx: &mut Context,
         buf: &[u8],
     ) -> Poll<io::Result<usize>>;
 
+    /// Attempt to flush the output, ensuring that all buffered data reaches the destination.
+    ///
+    /// On success, returns `Poll::Ready(Ok(()))`.
+    ///
+    /// If flushing cannot complete immediately, `Poll::Pending` is returned and the current task
+    /// will be notified via `cx.waker()` when progress can be made.
     fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>>;
 }
 
@@ -301,7 +315,9 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
             state: EncoderState::default(),
             parent: None,
             finished: false,
-            file_copy_buffer: Arc::new(Mutex::new(crate::util::vec_new(1024 * 1024))),
+            file_copy_buffer: Arc::new(Mutex::new(unsafe {
+                crate::util::vec_new_uninitialized(1024 * 1024)
+            })),
         };
 
         this.encode_metadata(metadata).await?;
@@ -366,6 +382,31 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
         })
     }
 
+    fn take_file_copy_buffer(&self) -> Vec<u8> {
+        let buf: Vec<_> = take(
+            &mut self
+                .file_copy_buffer
+                .lock()
+                .expect("failed to lock temporary buffer mutex"),
+        );
+        if buf.len() < 1024 * 1024 {
+            drop(buf);
+            unsafe { crate::util::vec_new_uninitialized(1024 * 1024) }
+        } else {
+            buf
+        }
+    }
+
+    fn put_file_copy_buffer(&self, buf: Vec<u8>) {
+        let mut lock = self
+            .file_copy_buffer
+            .lock()
+            .expect("failed to lock temporary buffer mutex");
+        if lock.len() < buf.len() {
+            *lock = buf;
+        }
+    }
+
     /// Return a file offset usable with `add_hardlink`.
     pub async fn add_file(
         &mut self,
@@ -374,9 +415,8 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
         file_size: u64,
         content: &mut dyn SeqRead,
     ) -> io::Result<LinkOffset> {
-        let buf = Arc::clone(&self.file_copy_buffer);
+        let mut buf = self.take_file_copy_buffer();
         let mut file = self.create_file(metadata, file_name, file_size).await?;
-        let mut buf = buf.lock().expect("failed to lock temporary buffer mutex");
         loop {
             let got = decoder::seq_read(&mut *content, &mut buf[..]).await?;
             if got == 0 {
@@ -385,7 +425,10 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
                 file.write_all(&buf[..got]).await?;
             }
         }
-        Ok(file.file_offset())
+        let offset = file.file_offset();
+        drop(file);
+        self.put_file_copy_buffer(buf);
+        Ok(offset)
     }
 
     /// Return a file offset usable with `add_hardlink`.
@@ -539,7 +582,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
         self.encode_filename(file_name).await?;
 
         let entry_offset = self.position();
-        self.encode_metadata(&metadata).await?;
+        self.encode_metadata(metadata).await?;
 
         let files_offset = self.position();
 
@@ -572,7 +615,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
     ) -> io::Result<()> {
         self.encode_filename(file_name).await?;
         if let Some(metadata) = metadata {
-            self.encode_metadata(&metadata).await?;
+            self.encode_metadata(metadata).await?;
         }
         Ok(())
     }
@@ -758,6 +801,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
         tail.sort_unstable_by(|a, b| a.hash.cmp(&b.hash));
 
         let mut bst = Vec::with_capacity(tail.len() + 1);
+        #[allow(clippy::uninit_vec)]
         unsafe {
             bst.set_len(tail.len());
         }
@@ -790,7 +834,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
 }
 
 /// Writer for a file object in a directory.
-pub struct FileImpl<'a, S: SeqWrite> {
+pub(crate) struct FileImpl<'a, S: SeqWrite> {
     output: &'a mut S,
 
     /// This file's `GoodbyeItem`. FIXME: We currently don't touch this, can we just push it