]> git.proxmox.com Git - pxar.git/blobdiff - src/encoder/mod.rs
don't hold temp buffer mutex across await point
[pxar.git] / src / encoder / mod.rs
index 1004efaf9aba9e9cb3658c2250d5e2e386530652..d14b2d04765bce11378af2de2007205675f64635 100644 (file)
@@ -2,6 +2,8 @@
 //!
 //! This is the implementation used by both the synchronous and async pxar wrappers.
 
+#![deny(missing_docs)]
+
 use std::io;
 use std::mem::{forget, size_of, size_of_val, take};
 use std::os::unix::ffi::OsStrExt;
@@ -29,6 +31,7 @@ pub use sync::Encoder;
 pub struct LinkOffset(u64);
 
 impl LinkOffset {
+    /// Get the raw byte offset of this link.
     #[inline]
     pub fn raw(self) -> u64 {
         self.0
@@ -41,12 +44,23 @@ impl LinkOffset {
 /// synchronous wrapper and for both `tokio` and `future` `AsyncWrite` types in the asynchronous
 /// wrapper.
 pub trait SeqWrite {
+    /// Attempt to perform a sequential write to the file. On success, the number of written bytes
+    /// is returned as `Poll::Ready(Ok(bytes))`.
+    ///
+    /// If writing is not yet possible, `Poll::Pending` is returned and the current task will be
+    /// notified via the `cx.waker()` when writing becomes possible.
     fn poll_seq_write(
         self: Pin<&mut Self>,
         cx: &mut Context,
         buf: &[u8],
     ) -> Poll<io::Result<usize>>;
 
+    /// Attempt to flush the output, ensuring that all buffered data reaches the destination.
+    ///
+    /// On success, returns `Poll::Ready(Ok(()))`.
+    ///
+    /// If flushing cannot complete immediately, `Poll::Pending` is returned and the current task
+    /// will be notified via `cx.waker()` when progress can be made.
     fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>>;
 }
 
@@ -83,6 +97,11 @@ async fn seq_write<T: SeqWrite + ?Sized>(
     Ok(put)
 }
 
+/// awaitable version of 'poll_flush'.
+async fn flush<T: SeqWrite + ?Sized>(output: &mut T) -> io::Result<()> {
+    poll_fn(|cx| unsafe { Pin::new_unchecked(&mut *output).poll_flush(cx) }).await
+}
+
 /// Write the entire contents of a buffer, handling short writes.
 async fn seq_write_all<T: SeqWrite + ?Sized>(
     output: &mut T,
@@ -190,6 +209,7 @@ struct EncoderState {
     /// Offset of this directory's ENTRY.
     entry_offset: u64,
 
+    #[allow(dead_code)]
     /// Offset to this directory's first FILENAME.
     files_offset: u64,
 
@@ -223,7 +243,7 @@ pub(crate) enum EncoderOutput<'a, T> {
 
 impl<'a, T> EncoderOutput<'a, T> {
     #[inline]
-    fn to_borrowed<'s>(&'s mut self) -> EncoderOutput<'s, T>
+    fn to_borrowed_mut<'s>(&'s mut self) -> EncoderOutput<'s, T>
     where
         'a: 's,
     {
@@ -295,7 +315,9 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
             state: EncoderState::default(),
             parent: None,
             finished: false,
-            file_copy_buffer: Arc::new(Mutex::new(crate::util::vec_new(1024 * 1024))),
+            file_copy_buffer: Arc::new(Mutex::new(unsafe {
+                crate::util::vec_new_uninitialized(1024 * 1024)
+            })),
         };
 
         this.encode_metadata(metadata).await?;
@@ -360,6 +382,31 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
         })
     }
 
+    fn take_file_copy_buffer(&self) -> Vec<u8> {
+        let buf: Vec<_> = take(
+            &mut self
+                .file_copy_buffer
+                .lock()
+                .expect("failed to lock temporary buffer mutex"),
+        );
+        if buf.len() < 1024 * 1024 {
+            drop(buf);
+            unsafe { crate::util::vec_new_uninitialized(1024 * 1024) }
+        } else {
+            buf
+        }
+    }
+
+    fn put_file_copy_buffer(&self, buf: Vec<u8>) {
+        let mut lock = self
+            .file_copy_buffer
+            .lock()
+            .expect("failed to lock temporary buffer mutex");
+        if lock.len() < buf.len() {
+            *lock = buf;
+        }
+    }
+
     /// Return a file offset usable with `add_hardlink`.
     pub async fn add_file(
         &mut self,
@@ -368,9 +415,8 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
         file_size: u64,
         content: &mut dyn SeqRead,
     ) -> io::Result<LinkOffset> {
-        let buf = Arc::clone(&self.file_copy_buffer);
+        let mut buf = self.take_file_copy_buffer();
         let mut file = self.create_file(metadata, file_name, file_size).await?;
-        let mut buf = buf.lock().expect("failed to lock temporary buffer mutex");
         loop {
             let got = decoder::seq_read(&mut *content, &mut buf[..]).await?;
             if got == 0 {
@@ -379,7 +425,10 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
                 file.write_all(&buf[..got]).await?;
             }
         }
-        Ok(file.file_offset())
+        let offset = file.file_offset();
+        drop(file);
+        self.put_file_copy_buffer(buf);
+        Ok(offset)
     }
 
     /// Return a file offset usable with `add_hardlink`.
@@ -533,7 +582,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
         self.encode_filename(file_name).await?;
 
         let entry_offset = self.position();
-        self.encode_metadata(&metadata).await?;
+        self.encode_metadata(metadata).await?;
 
         let files_offset = self.position();
 
@@ -544,7 +593,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
 
         Ok(EncoderImpl {
             // always forward as Borrowed(), to avoid stacking references on nested calls
-            output: self.output.to_borrowed(),
+            output: self.output.to_borrowed_mut(),
             state: EncoderState {
                 entry_offset,
                 files_offset,
@@ -566,7 +615,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
     ) -> io::Result<()> {
         self.encode_filename(file_name).await?;
         if let Some(metadata) = metadata {
-            self.encode_metadata(&metadata).await?;
+            self.encode_metadata(metadata).await?;
         }
         Ok(())
     }
@@ -715,6 +764,10 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
         )
         .await?;
 
+        if let EncoderOutput::Owned(output) = &mut self.output {
+            flush(output).await?;
+        }
+
         // done up here because of the self-borrow and to propagate
         let end_offset = self.position();
 
@@ -748,6 +801,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
         tail.sort_unstable_by(|a, b| a.hash.cmp(&b.hash));
 
         let mut bst = Vec::with_capacity(tail.len() + 1);
+        #[allow(clippy::uninit_vec)]
         unsafe {
             bst.set_len(tail.len());
         }
@@ -780,7 +834,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
 }
 
 /// Writer for a file object in a directory.
-pub struct FileImpl<'a, S: SeqWrite> {
+pub(crate) struct FileImpl<'a, S: SeqWrite> {
     output: &'a mut S,
 
     /// This file's `GoodbyeItem`. FIXME: We currently don't touch this, can we just push it